Develop Pulsar Functions
You learn how to develop Pulsar Functions with different APIs for Java, Python and Go.
Available APIs
In Java and Python, you have two options to write Pulsar Functions. In Go, you can use Pulsar Functions SDK for Go.
Interface | Description | Use cases |
---|---|---|
Language-native interface | No Pulsar-specific libraries or special dependencies required (only core libraries from Java/Python). | Functions that do not require access to the function context. |
Pulsar Function SDK for Java/Python/Go | Pulsar-specific libraries that provide a range of functionality not provided by “native” interfaces. | Functions that require access to the function context. |
Extended Pulsar Function SDK for Java | An extension to Pulsar-specific libraries, providing the initialization and close interfaces in Java. | Functions that require initializing and releasing external resources. |
Language-native interface
The language-native function, which adds an exclamation point to all incoming strings and publishes the resulting string to a topic, has no external dependencies. The following example is language-native function.
- Java
- Python
import java.util.function.Function;
public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}
For complete code, see here.
def process(input):
return "{}!".format(input)
For complete code, see here.
note
You can write Pulsar Functions in python2 or python3. However, Pulsar only looks for python
as the interpreter. If you’re running Pulsar Functions on an Ubuntu system that only supports python3, you might fail to start the functions. In this case, you can create a symlink. Your system will fail if you subsequently install any other package that depends on Python 2.x. A solution is under development in Issue 5518.
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10
Pulsar Function SDK for Java/Python/Go
The following example uses Pulsar Functions SDK.
- Java
- Python
- Go
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}
For complete code, see here.
from pulsar import Function
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
return input + '!'
For complete code, see here.
package main
import (
"context"
"fmt"
"github.com/apache/pulsar/pulsar-function-go/pf"
)
func HandleRequest(ctx context.Context, in []byte) error{
fmt.Println(string(in) + "!")
return nil
}
func main() {
pf.Start(HandleRequest)
}
For complete code, see here.
Extended Pulsar Function SDK for Java
This extended Pulsar Function SDK provides two additional interfaces to initialize and release external resources.
- By using the
initialize
interface, you can initialize external resources which only need one-time initialization when the function instance starts. - By using the
close
interface, you can close the referenced external resources when the function instance closes.
note
The extended Pulsar Function SDK for Java is available in Pulsar 2.10.0 and later versions. Before using it, you need to set up Pulsar Function worker 2.10.0 or later versions.
The following example uses the extended interface of Pulsar Function SDK for Java to initialize RedisClient when the function instance starts and release it when the function instance closes.
- Java
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import io.lettuce.core.RedisClient;
public class InitializableFunction implements Function<String, String> {
private RedisClient redisClient;
private void initRedisClient(Map<String, Object> connectInfo) {
redisClient = RedisClient.create(connectInfo.get("redisURI"));
}
@Override
public void initialize(Context context) {
Map<String, Object> connectInfo = context.getUserConfigMap();
redisClient = initRedisClient(connectInfo);
}
@Override
public String process(String input, Context context) {
String value = client.get(key);
return String.format("%s-%s", input, value);
}
@Override
public void close() {
redisClient.close();
}
}
Schema registry
Pulsar has a built-in schema registry and is bundled with popular schema types, such as Avro, JSON and Protobuf. Pulsar Functions can leverage the existing schema information from input topics and derive the input type. The schema registry applies for output topic as well.
SerDe
SerDe stands for Serialization and Deserialization. Pulsar Functions uses SerDe when publishing data to and consuming data from Pulsar topics. How SerDe works by default depends on the language you use for a particular function.
- Java
- Python
- Go
When you write Pulsar Functions in Java, the following basic Java types are built in and supported by default: String
, Double
, Integer
, Float
, Long
, Short
, and Byte
.
To customize Java types, you need to implement the following interface.
public interface SerDe<T> {
T deserialize(byte[] input);
byte[] serialize(T input);
}
SerDe works in the following ways in Java Functions.
- If the input and output topics have schema, Pulsar Functions use schema for SerDe.
- If the input or output topics do not exist, Pulsar Functions adopt the following rules to determine SerDe:
- If the schema type is specified, Pulsar Functions use the specified schema type.
- If SerDe is specified, Pulsar Functions use the specified SerDe, and the schema type for input and output topics is
Byte
. - If neither the schema type nor SerDe is specified, Pulsar Functions use the built-in SerDe. For non-primitive schema type, the built-in SerDe serializes and deserializes objects in the
JSON
format.
In Python, the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns.
You can specify the SerDe when creating or running functions.
$ bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name my_function \
--py my_function.py \
--classname my_function.MyFunction \
--custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \
--output-serde-classname Serde3 \
--output output-topic-1
This case contains two input topics: input-topic-1
and input-topic-2
, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, output-topic-1
, uses the Serde3
class for SerDe. At the moment, all Pulsar Functions logic, include processing function and SerDe classes, must be contained within a single Python file.
When using Pulsar Functions for Python, you have three SerDe options:
- You can use the IdentitySerde, which leaves the data unchanged. The
IdentitySerDe
is the default. Creating or running a function without explicitly specifying SerDe means that this option is used. - You can use the PickleSerDe, which uses Python pickle for SerDe.
- You can create a custom SerDe class by implementing the baseline SerDe class, which has just two methods: serialize for converting the object into bytes, and deserialize for converting bytes into an object of the required application-specific type.
The table below shows when you should use each SerDe.
SerDe option | When to use |
---|---|
IdentitySerde | When you work with simple types like strings, Booleans, integers. |
PickleSerDe | When you work with complex, application-specific types and are comfortable with the “best effort” approach of pickle . |
Custom SerDe | When you require explicit control over SerDe, potentially for performance or data compatibility purposes. |
Currently, the feature is not available in Go.
Example
Imagine that you’re writing Pulsar Functions that are processing tweet objects, you can refer to the following example of Tweet
class.
- Java
- Python
public class Tweet {
private String username;
private String tweetContent;
public Tweet(String username, String tweetContent) {
this.username = username;
this.tweetContent = tweetContent;
}
// Standard setters and getters
}
To pass Tweet
objects directly between Pulsar Functions, you need to provide a custom SerDe class. In the example below, Tweet
objects are basically strings in which the username and tweet content are separated by a |
.
package com.example.serde;
import org.apache.pulsar.functions.api.SerDe;
import java.util.regex.Pattern;
public class TweetSerde implements SerDe<Tweet> {
public Tweet deserialize(byte[] input) {
String s = new String(input);
String[] fields = s.split(Pattern.quote("|"));
return new Tweet(fields[0], fields[1]);
}
public byte[] serialize(Tweet input) {
return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes();
}
}
To apply this customized SerDe to a particular Pulsar Function, you need to:
- Package the
Tweet
andTweetSerde
classes into a JAR. - Specify a path to the JAR and SerDe class name when deploying the function.
The following is an example of create operation.
$ bin/pulsar-admin functions create \
--jar /path/to/your.jar \
--output-serde-classname com.example.serde.TweetSerde \
# Other function attributes
Custom SerDe classes must be packaged with your function JARs
Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. So you need to include your SerDe classes in your function JARs. If not, Pulsar returns an error.
class Tweet(object):
def __init__(self, username, tweet_content):
self.username = username
self.tweet_content = tweet_content
In order to use this class in Pulsar Functions, you have two options:
You can specify
PickleSerDe
, which applies the pickle library SerDe.You can create your own SerDe class. The following is an example.
from pulsar import SerDe
class TweetSerDe(SerDe):
def serialize(self, input):
return bytes("{0}|{1}".format(input.username, input.tweet_content))
def deserialize(self, input_bytes):
tweet_components = str(input_bytes).split('|')
return Tweet(tweet_components[0], tweet_componentsp[1])
For complete code, see here.
In both languages, however, you can write custom SerDe logic for more complex, application-specific types.
Context
Java, Python and Go SDKs provide access to a context object that can be used by a function. This context object provides a wide variety of information and functionality to the function.
- The name and ID of a Pulsar Function.
- The message ID of each message. Each Pulsar message is automatically assigned with an ID.
- The key, event time, properties and partition key of each message.
- The name of the topic to which the message is sent.
- The names of all input topics as well as the output topic associated with the function.
- The name of the class used for SerDe.
- The tenant and namespace associated with the function.
- The ID of the Pulsar Functions instance running the function.
- The version of the function.
- The logger object used by the function, which can be used to create function log messages.
- Access to arbitrary user configuration values supplied via the CLI.
- An interface for recording metrics.
- An interface for storing and retrieving state in state storage.
- A function to publish new messages onto arbitrary topics.
- A function to ack the message being processed (if auto-ack is disabled).
(Java) get Pulsar admin client.
Java
- Python
- Go
The Context interface provides a number of methods that you can use to access the function context. The various method signatures for the Context
interface are listed as follows.
public interface Context {
Record<?> getCurrentRecord();
Collection<String> getInputTopics();
String getOutputTopic();
String getOutputSchemaType();
String getTenant();
String getNamespace();
String getFunctionName();
String getFunctionId();
String getInstanceId();
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
void incrCounterAsync(String key, long amount);
long getCounter(String key);
long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
void recordMetric(String metricName, double value);
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
<O> CompletableFuture<Void> publish(String topicName, O object);
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
PulsarAdmin getPulsarAdmin();
PulsarAdmin getPulsarAdmin(String clusterName);
}
The following example uses several methods available via the Context
object.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import java.util.stream.Collectors;
public class ContextFunction implements Function<String, Void> {
public Void process(String input, Context context) {
Logger LOG = context.getLogger();
String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
String functionName = context.getFunctionName();
String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n",
input,
inputTopics);
LOG.info(logMessage);
String metricName = String.format("function-%s-messages-received", functionName);
context.recordMetric(metricName, 1);
return null;
}
}
class ContextImpl(pulsar.Context):
def get_message_id(self):
...
def get_message_key(self):
...
def get_message_eventtime(self):
...
def get_message_properties(self):
...
def get_current_message_topic_name(self):
...
def get_partition_key(self):
...
def get_function_name(self):
...
def get_function_tenant(self):
...
def get_function_namespace(self):
...
def get_function_id(self):
...
def get_instance_id(self):
...
def get_function_version(self):
...
def get_logger(self):
...
def get_user_config_value(self, key):
...
def get_user_config_map(self):
...
def record_metric(self, metric_name, metric_value):
...
def get_input_topics(self):
...
def get_output_topic(self):
...
def get_output_serde_class_name(self):
...
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe",
properties=None, compression_type=None, callback=None, message_conf=None):
...
def ack(self, msgid, topic):
...
def get_and_reset_metrics(self):
...
def reset_metrics(self):
...
def get_metrics(self):
...
def incr_counter(self, key, amount):
...
def get_counter(self, key):
...
def del_counter(self, key):
...
def put_state(self, key, value):
...
def get_state(self, key):
...
func (c *FunctionContext) GetInstanceID() int {
return c.instanceConf.instanceID
}
func (c *FunctionContext) GetInputTopics() []string {
return c.inputTopics
}
func (c *FunctionContext) GetOutputTopic() string {
return c.instanceConf.funcDetails.GetSink().Topic
}
func (c *FunctionContext) GetFuncTenant() string {
return c.instanceConf.funcDetails.Tenant
}
func (c *FunctionContext) GetFuncName() string {
return c.instanceConf.funcDetails.Name
}
func (c *FunctionContext) GetFuncNamespace() string {
return c.instanceConf.funcDetails.Namespace
}
func (c *FunctionContext) GetFuncID() string {
return c.instanceConf.funcID
}
func (c *FunctionContext) GetFuncVersion() string {
return c.instanceConf.funcVersion
}
func (c *FunctionContext) GetUserConfValue(key string) interface{} {
return c.userConfigs[key]
}
func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
c.record = record
}
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
return c.record
}
func (c *FunctionContext) NewOutputMessage(topic string) pulsar.Producer {
return c.outputMessage(topic)
}
The following example uses several methods available via the Context
object.
import (
"context"
"fmt"
"github.com/apache/pulsar/pulsar-function-go/pf"
)
func contextFunc(ctx context.Context) {
if fc, ok := pf.FromContext(ctx); ok {
fmt.Printf("function ID is:%s, ", fc.GetFuncID())
fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
}
}
For complete code, see here.
User config
When you run or update Pulsar Functions created using SDK, you can pass arbitrary key/values to them with the command line with the --user-config
flag. Key/values must be specified as JSON. The following function creation command passes a user configured key/value to a function.
$ bin/pulsar-admin functions create \
--name word-filter \
# Other function configs
--user-config '{"forbidden-word":"rosebud"}'
- Java
- Python
- Go
The Java SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.
$ bin/pulsar-admin functions create \
# Other function configs
--user-config '{"word-of-the-day":"verdure"}'
To access that value in a Java function:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import java.util.Optional;
public class UserConfigFunction implements Function<String, Void> {
@Override
public void apply(String input, Context context) {
Logger LOG = context.getLogger();
Optional<String> wotd = context.getUserConfigValue("word-of-the-day");
if (wotd.isPresent()) {
LOG.info("The word of the day is {}", wotd);
} else {
LOG.warn("No word of the day provided");
}
return null;
}
}
The UserConfigFunction
function will log the string "The word of the day is verdure"
every time the function is invoked (which means every time a message arrives). The word-of-the-day
user config will be changed only when the function is updated with a new config value via the command line.
You can also access the entire user config map or set a default value in case no value is present:
// Get the whole config map
Map<String, String> allConfigs = context.getUserConfigMap();
// Get value or resort to default
String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");
For all key/value pairs passed to Java functions, both the key and the value are
String
. To set the value to be a different type, you need to deserialize from theString
type.
In Python function, you can access the configuration value like this.
from pulsar import Function
class WordFilter(Function):
def process(self, context, input):
forbidden_word = context.user_config()["forbidden-word"]
# Don't publish the message if it contains the user-supplied
# forbidden word
if forbidden_word in input:
pass
# Otherwise publish the message
else:
return input
The Python SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.
$ bin/pulsar-admin functions create \
# Other function configs \
--user-config '{"word-of-the-day":"verdure"}'
To access that value in a Python function:
from pulsar import Function
class UserConfigFunction(Function):
def process(self, input, context):
logger = context.get_logger()
wotd = context.get_user_config_value('word-of-the-day')
if wotd is None:
logger.warn('No word of the day provided')
else:
logger.info("The word of the day is {0}".format(wotd))
The Go SDK Context object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.
$ bin/pulsar-admin functions create \
--go path/to/go/binary
--user-config '{"word-of-the-day":"lackadaisical"}'
To access that value in a Go function:
func contextFunc(ctx context.Context) {
fc, ok := pf.FromContext(ctx)
if !ok {
logutil.Fatal("Function context is not defined")
}
wotd := fc.GetUserConfValue("word-of-the-day")
if wotd == nil {
logutil.Warn("The word of the day is empty")
} else {
logutil.Infof("The word of the day is %s", wotd.(string))
}
}
Logger
- Java
- Python
- Go
Pulsar Functions that use the Java SDK have access to an SLF4j Logger object that can be used to produce logs at the chosen log level. The following example logs either a WARNING
- or INFO
-level log based on whether the incoming string contains the word danger
.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public class LoggingFunction implements Function<String, Void> {
@Override
public void apply(String input, Context context) {
Logger LOG = context.getLogger();
String messageId = new String(context.getMessageId());
if (input.contains("danger")) {
LOG.warn("A warning was received in message {}", messageId);
} else {
LOG.info("Message {} received\nContent: {}", messageId, input);
}
return null;
}
}
If you want your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.
$ bin/pulsar-admin functions create \
--jar my-functions.jar \
--classname my.package.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs \
# Other function configs
All logs produced by LoggingFunction
above can be accessed via the persistent://public/default/logging-function-logs
topic.
Customize Function log level
Additionally, you can use the XML file, functions_log4j2.xml
, to customize the function log level. To customize the function log level, create or update functions_log4j2.xml
in your Pulsar conf directory (for example, /etc/pulsar/
on bare-metal, or /pulsar/conf
on Kubernetes) to contain contents such as:
<Configuration>
<name>pulsar-functions-instance</name>
<monitorInterval>30</monitorInterval>
<Properties>
<Property>
<name>pulsar.log.appender</name>
<value>RollingFile</value>
</Property>
<Property>
<name>pulsar.log.level</name>
<value>debug</value>
</Property>
<Property>
<name>bk.log.level</name>
<value>debug</value>
</Property>
</Properties>
<Appenders>
<Console>
<name>Console</name>
<target>SYSTEM_OUT</target>
<PatternLayout>
<Pattern>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
</Console>
<RollingFile>
<name>RollingFile</name>
<fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log</fileName>
<filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
<immediateFlush>true</immediateFlush>
<PatternLayout>
<Pattern>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy>
<interval>1</interval>
<modulate>true</modulate>
</TimeBasedTriggeringPolicy>
<SizeBasedTriggeringPolicy>
<size>1 GB</size>
</SizeBasedTriggeringPolicy>
<CronTriggeringPolicy>
<schedule>0 0 0 * * ?</schedule>
</CronTriggeringPolicy>
</Policies>
<DefaultRolloverStrategy>
<Delete>
<basePath>${sys:pulsar.function.log.dir}</basePath>
<maxDepth>2</maxDepth>
<IfFileName>
<glob>*/${sys:pulsar.function.log.file}*log.gz</glob>
</IfFileName>
<IfLastModified>
<age>30d</age>
</IfLastModified>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
<RollingRandomAccessFile>
<name>BkRollingFile</name>
<fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk</fileName>
<filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
<immediateFlush>true</immediateFlush>
<PatternLayout>
<Pattern>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy>
<interval>1</interval>
<modulate>true</modulate>
</TimeBasedTriggeringPolicy>
<SizeBasedTriggeringPolicy>
<size>1 GB</size>
</SizeBasedTriggeringPolicy>
<CronTriggeringPolicy>
<schedule>0 0 0 * * ?</schedule>
</CronTriggeringPolicy>
</Policies>
<DefaultRolloverStrategy>
<Delete>
<basePath>${sys:pulsar.function.log.dir}</basePath>
<maxDepth>2</maxDepth>
<IfFileName>
<glob>*/${sys:pulsar.function.log.file}.bk*log.gz</glob>
</IfFileName>
<IfLastModified>
<age>30d</age>
</IfLastModified>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Logger>
<name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
<level>${sys:bk.log.level}</level>
<additivity>false</additivity>
<AppenderRef>
<ref>BkRollingFile</ref>
</AppenderRef>
</Logger>
<Root>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
</AppenderRef>
</Root>
</Loggers>
</Configuration>
The properties set like:
<Property>
<name>pulsar.log.level</name>
<value>debug</value>
</Property>
propagate to places where they are referenced, such as:
<Root>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
</AppenderRef>
</Root>
In the above example, debug level logging would be applied to ALL function logs. This may be more verbose than you desire. To be more selective, you can apply different log levels to different classes or modules. For example:
<Logger>
<name>com.example.module</name>
<level>info</level>
<additivity>false</additivity>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
</AppenderRef>
</Logger>
You can be more specific as well, such as applying a more verbose log level to a class in the module, such as:
<Logger>
<name>com.example.module.className</name>
<level>debug</level>
<additivity>false</additivity>
<AppenderRef>
<ref>Console</ref>
</AppenderRef>
</Logger>
Each <AppenderRef>
entry allows you to output the log to a target specified in the definition of the Appender.
Additivity pertains to whether log messages will be duplicated if multiple Logger entries overlap. To disable additivity, specify
<additivity>false</additivity>
as shown in examples above. Disabling additivity prevents duplication of log messages when one or more <Logger>
entries contain classes or modules that overlap.
The <AppenderRef>
is defined in the <Appenders>
section, such as:
<Console>
<name>Console</name>
<target>SYSTEM_OUT</target>
<PatternLayout>
<Pattern>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
</Console>
Pulsar Functions that use the Python SDK have access to a logging object that can be used to produce logs at the chosen log level. The following example function that logs either a WARNING
- or INFO
-level log based on whether the incoming string contains the word danger
.
from pulsar import Function
class LoggingFunction(Function):
def process(self, input, context):
logger = context.get_logger()
msg_id = context.get_message_id()
if 'danger' in input:
logger.warn("A warning was received in message {0}".format(context.get_message_id()))
else:
logger.info("Message {0} received\nContent: {1}".format(msg_id, input))
If you want your function to produce logs on a Pulsar topic, you need to specify a log topic when creating or running the function. The following is an example.
$ bin/pulsar-admin functions create \
--py logging_function.py \
--classname logging_function.LoggingFunction \
--log-topic logging-function-logs \
# Other function configs
All logs produced by LoggingFunction
above can be accessed via the logging-function-logs
topic. Additionally, you can specify the function log level through the broker XML file as described in Customize Function log level.
The following Go Function example shows different log levels based on the function input.
import (
"context"
"github.com/apache/pulsar/pulsar-function-go/pf"
log "github.com/apache/pulsar/pulsar-function-go/logutil"
)
func loggerFunc(ctx context.Context, input []byte) {
if len(input) <= 100 {
log.Infof("This input has a length of: %d", len(input))
} else {
log.Warnf("This input is getting too long! It has {%d} characters", len(input))
}
}
func main() {
pf.Start(loggerFunc)
}
When you use logTopic
related functionalities in Go Function, import github.com/apache/pulsar/pulsar-function-go/logutil
, and you do not have to use the getLogger()
context object.
Additionally, you can specify the function log level through the broker XML file, as described here: Customize Function log level
Pulsar admin
Pulsar Functions using the Java SDK has access to the Pulsar admin client, which allows the Pulsar admin client to manage API calls to current Pulsar clusters or external clusters (if external-pulsars
is provided).
- Java
Below is an example of how to use the Pulsar admin client exposed from the Function context
.
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
* In this particular example, for every input message,
* the function resets the cursor of the current function's subscription to a
* specified timestamp.
*/
public class CursorManagementFunction implements Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
PulsarAdmin adminClient = context.getPulsarAdmin();
if (adminClient != null) {
String topic = context.getCurrentRecord().getTopicName().isPresent() ?
context.getCurrentRecord().getTopicName().get() : null;
String subName = context.getTenant() + "/" + context.getNamespace() + "/" + context.getFunctionName();
if (topic != null) {
// 1578188166 below is a random-pick timestamp
adminClient.topics().resetCursor(topic, subName, 1578188166);
return "reset cursor successfully";
}
}
return null;
}
}
If you want your function to get access to the Pulsar admin client, you need to enable this feature by setting exposeAdminClientEnabled=true
in the functions_worker.yml
file. You can test whether this feature is enabled or not using the command pulsar-admin functions localrun
with the flag --web-service-url
.
$ bin/pulsar-admin functions localrun \
--jar my-functions.jar \
--classname my.package.CursorManagementFunction \
--web-service-url http://pulsar-web-service:8080 \
# Other function configs
Metrics
Pulsar Functions allows you to deploy and manage processing functions that consume messages from and publish messages to Pulsar topics easily. It is important to ensure that the running functions are healthy at any time. Pulsar Functions can publish arbitrary metrics to the metrics interface which can be queried.
note
If a Pulsar Function uses the language-native interface for Java or Python, that function is not able to publish metrics and stats to Pulsar.
You can monitor Pulsar Functions that have been deployed with the following methods:
Check the metrics provided by Pulsar.
Pulsar Functions expose the metrics that can be collected and used for monitoring the health of Java, Python, and Go functions. You can check the metrics by following the monitoring guide.
For the complete list of the function metrics, see here.
Set and check your customized metrics.
In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for Java and Python functions. Function workers collect user-defined metrics to Prometheus automatically and you can check them in Grafana.
Here are examples of how to customize metrics for Java and Python functions.
- Java
- Python
- Go
You can record metrics using the Context object on a per-key basis. For example, you can set a metric for the process-count
key and a different metric for the elevens-count
key every time the function processes a message.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class MetricRecorderFunction implements Function<Integer, Void> {
@Override
public void apply(Integer input, Context context) {
// Records the metric 1 every time a message arrives
context.recordMetric("hit-count", 1);
// Records the metric only if the arriving number equals 11
if (input == 11) {
context.recordMetric("elevens-count", 1);
}
return null;
}
}
You can record metrics using the Context object on a per-key basis. For example, you can set a metric for the process-count
key and a different metric for the elevens-count
key every time the function processes a message. The following is an example.
from pulsar import Function
class MetricRecorderFunction(Function):
def process(self, input, context):
context.record_metric('hit-count', 1)
if input == 11:
context.record_metric('elevens-count', 1)
The Go SDK Context object enables you to record metrics on a per-key basis. For example, you can set a metric for the process-count
key and a different metric for the elevens-count
key every time the function processes a message:
func metricRecorderFunction(ctx context.Context, in []byte) error {
inputstr := string(in)
fctx, ok := pf.FromContext(ctx)
if !ok {
return errors.New("get Go Functions Context error")
}
fctx.RecordMetric("hit-count", 1)
if inputstr == "eleven" {
fctx.RecordMetric("elevens-count", 1)
}
return nil
}
Security
If you want to enable security on Pulsar Functions, first you should enable security on Functions Workers. For more details, refer to Security settings.
Pulsar Functions can support the following providers:
- ClearTextSecretsProvider
- EnvironmentBasedSecretsProvider
Pulsar Function supports ClearTextSecretsProvider by default.
At the same time, Pulsar Functions provides two interfaces, SecretsProvider and SecretsProviderConfigurator, allowing users to customize secret provider.
- Java
- Python
- Go
You can get secret provider using the Context object. The following is an example:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public class GetSecretProviderFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Logger LOG = context.getLogger();
String secretProvider = context.getSecret(input);
if (!secretProvider.isEmpty()) {
LOG.info("The secret provider is {}", secretProvider);
} else {
LOG.warn("No secret provider");
}
return null;
}
}
You can get secret provider using the Context object. The following is an example:
from pulsar import Function
class GetSecretProviderFunction(Function):
def process(self, input, context):
logger = context.get_logger()
secret_provider = context.get_secret(input)
if secret_provider is None:
logger.warn('No secret provider')
else:
logger.info("The secret provider is {0}".format(secret_provider))
Currently, the feature is not available in Go.
State storage
Pulsar Functions use Apache BookKeeper as a state storage interface. Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.
Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper table service to store the State
for functions. For example, a WordCount
function can store its counters
state into BookKeeper table service via Pulsar Functions State API.
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
You can access states within Pulsar Java Functions using the putState
, putStateAsync
, getState
, getStateAsync
, incrCounter
, incrCounterAsync
, getCounter
, getCounterAsync
and deleteState
calls on the context object. You can access states within Pulsar Python Functions using the putState
, getState
, incrCounter
, getCounter
and deleteState
calls on the context object. You can also manage states using the querystate and putstate options to pulsar-admin functions
.
note
State storage is not available in Go.
API
- Java
- Python
Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the Context object when you are using Java SDK functions.
incrCounter
/**
* Increment the builtin distributed counter referred by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
The application can use incrCounter
to change the counter of a given key
by the given amount
.
incrCounterAsync
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
The application can use incrCounterAsync
to asynchronously change the counter of a given key
by the given amount
.
getCounter
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
The application can use getCounter
to retrieve the counter of a given key
mutated by incrCounter
.
Except the counter
API, Pulsar also exposes a general key/value API for functions to store general key/value state.
getCounterAsync
/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);
The application can use getCounterAsync
to asynchronously retrieve the counter of a given key
mutated by incrCounterAsync
.
putState
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
putStateAsync
/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
The application can use putStateAsync
to asynchronously update the state of a given key
.
getState
/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);
getStateAsync
/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
The application can use getStateAsync
to asynchronously retrieve the state of a given key
.
deleteState
/**
* Delete the state value for the key.
*
* @param key name of the key
*/
Counters and binary values share the same keyspace, so this deletes either type.
Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the Context object when you are using Python SDK functions.
incr_counter
def incr_counter(self, key, amount):
""incr the counter of a given key in the managed state""
Application can use incr_counter
to change the counter of a given key
by the given amount
. If the key
does not exist, a new key is created.
get_counter
def get_counter(self, key):
"""get the counter of a given key in the managed state"""
Application can use get_counter
to retrieve the counter of a given key
mutated by incrCounter
.
Except the counter
API, Pulsar also exposes a general key/value API for functions to store general key/value state.
put_state
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
The key is a string, and the value is arbitrary binary data.
get_state
def get_state(self, key):
"""get the value of a given key in the managed state"""
del_counter
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
Counters and binary values share the same keyspace, so this deletes either type.
Query State
A Pulsar Function can use the State API for storing state into Pulsar’s state storage and retrieving state back from Pulsar’s state storage. Additionally Pulsar also provides CLI commands for querying its state.
$ bin/pulsar-admin functions querystate \
--tenant <tenant> \
--namespace <namespace> \
--name <function-name> \
--state-storage-url <bookkeeper-service-url> \
--key <state-key> \
[---watch]
If --watch
is specified, the CLI will watch the value of the provided state-key
.
Example
- Java
- Python
WordCountFunction is a very good example demonstrating on how Application can easily store state
in Pulsar Functions.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
The logic of this WordCount
function is pretty simple and straightforward:
- The function first splits the received
String
into multiple words using regex\\.
. - For each
word
, the function increments the correspondingcounter
by 1 (viaincrCounter(key, amount)
).
from pulsar import Function
class WordCount(Function):
def process(self, item, context):
for word in item.split():
context.incr_counter(word, 1)
The logic of this WordCount
function is pretty simple and straightforward:
- The function first splits the received string into multiple words on space.
- For each
word
, the function increments the correspondingcounter
by 1 (viaincr_counter(key, amount)
).