Pulsar Functions overview

Pulsar Functions are lightweight compute processes that

  • consume messages from one or more Pulsar topics,
  • apply a user-supplied processing logic to each message,
  • publish the results of the computation to another topic.The following is an example of a Pulsar Function written in Java (using the native interface).
  1. import java.util.Function;
  2. public class ExclamationFunction implements Function<String, String> {
  3. @Override
  4. public String apply(String input) { return String.format("%s!", input); }
  5. }

The following is an example of a Pulsar Function written in Python (using the native interface).

  1. def process(input):
  2. return "{0}!".format(input)

The following is an example of a Pulsar Function written in Go.

  1. import (
  2. "fmt"
  3. "context"
  4. "github.com/apache/pulsar/pulsar-function-go/pf"
  5. )
  6. func HandleRequest(ctx context.Context, in []byte) error {
  7. fmt.Println(string(in) + "!")
  8. return nil
  9. }
  10. func main() {
  11. pf.Start(HandleRequest)
  12. }

A Pulsar Function is executed each time a message is published to its input topic. For example, if a function has an input topic called tweet-stream, the function runs each time a message is published to tweet-stream.

Goals

With Pulsar Functions, you can create complex processing logic without deploying a separate neighboring system (such as Apache Storm, Apache Heron, Apache Flink). Pulsar Functions are computing infrastructure of Pulsar messaging system. The core goal is tied to a series of other goals:

Inspirations

Pulsar Functions are inspired by (and take cues from) several systems and paradigms:

Programming model

The core programming model of Pulsar Functions is simple. Functions receive messages from one or more input topics. Each time a message is received, the function will complete the following tasks.

  • Apply some processing logic to the input and write output to:
  • Write logs to a log topic (potentially for debugging purposes)
  • Increment a counterPulsar Functions core programming model

Word count example

If you implement the classic word count example using Pulsar Functions, it looks something like this:

Pulsar Functions word count example

To write the function in Java with Pulsar Functions SDK for Java, you can write the function as follows.

  1. package org.example.functions;
  2. import org.apache.pulsar.functions.api.Context;
  3. import org.apache.pulsar.functions.api.Function;
  4. import java.util.Arrays;
  5. public class WordCountFunction implements Function<String, Void> {
  6. // This function is invoked every time a message is published to the input topic
  7. @Override
  8. public Void process(String input, Context context) throws Exception {
  9. Arrays.asList(input.split(" ")).forEach(word -> {
  10. String counterKey = word.toLowerCase();
  11. context.incrCounter(counterKey, 1);
  12. });
  13. return null;
  14. }
  15. }

Bundle and build the JAR file to be deployed. You can find approaches in Creating an Uber JAR and Creating a NAR package.Then deploy it in your Pulsar cluster using the command line as follows.

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-jar-with-dependencies.jar \
  3. --classname org.example.functions.WordCountFunction \
  4. --tenant public \
  5. --namespace default \
  6. --name word-count \
  7. --inputs persistent://public/default/sentences \
  8. --output persistent://public/default/count

Content-based routing example

Pulsar Functions are used in many cases. The following is a sophisticated example that involves content-based routing.

For example, a function takes items (strings) as input and publishes them to either a fruits or vegetables topic, depending on the item. Or, if an item is neither fruit nor vegetable, a warning is logged to a log topic. The following is a visual representation.

Pulsar Functions routing example

If you implement this routing functionality in Python, it looks something like this:

  1. from pulsar import Function
  2. class RoutingFunction(Function):
  3. def __init__(self):
  4. self.fruits_topic = "persistent://public/default/fruits"
  5. self.vegetables_topic = "persistent://public/default/vegetables"
  6. def is_fruit(item):
  7. return item in ["apple", "orange", "pear", "other fruits..."]
  8. def is_vegetable(item):
  9. return item in ["carrot", "lettuce", "radish", "other vegetables..."]
  10. def process(self, item, context):
  11. if self.is_fruit(item):
  12. context.publish(self.fruits_topic, item)
  13. elif self.is_vegetable(item):
  14. context.publish(self.vegetables_topic, item)
  15. else:
  16. warning = "The item {0} is neither a fruit nor a vegetable".format(item)
  17. context.get_logger().warn(warning)

Command-line interface

Pulsar Functions are managed using the pulsar-admin CLI tool (in particular the functions command). The following example runs a function in the local run mode.

  1. $ bin/pulsar-admin functions localrun \
  2. --inputs persistent://public/default/test_src \
  3. --output persistent://public/default/test_result \
  4. --jar examples/api-examples.jar \
  5. --classname org.apache.pulsar.functions.api.examples.ExclamationFunction

Fully Qualified Function Name (FQFN)

Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function tenant, namespace, and function name. FQFN looks like this:

  1. tenant/namespace/name

FQFNs enable you to create multiple functions with the same name provided that they are in different namespaces.

Configuration

You can configure a Pulsar Function in the following ways:

  1. $ bin/pulsar-admin functions create \
  2. --function-config-file ./my-function.yaml

The following is an example of the my-function.yaml file.

  1. name: my-function
  2. tenant: public
  3. namespace: default
  4. jar: ./target/my-functions.jar
  5. className: org.example.pulsar.functions.MyFunction
  6. inputs:
  7. - persistent://public/default/test_src
  8. output: persistent://public/default/test_result

You can specify some function attributes via CLI arguments or in a configuration file in YAML format.

Supported languages

Currently, you can write Pulsar Functions in Java, Python, and Go. Support for additional languages is coming soon.

Pulsar Functions API

Pulsar Functions API enables you to create processing logic that is:

  • Type safe. Pulsar Functions can process raw bytes or more complex, application-specific types.
  • Based on SerDe (Serialization/Deserialization). A variety of types are supported "out of the box" but you can also create your own custom SerDe logic.

Function context

Each Pulsar Function created using Pulsar Functions SDK has access to a context object that both provides:

  • A wide variety of information about the function, including:
  • The name of the function
  • The tenant and namespace of the function
  • User-supplied configuration values
  • Special functionality, including:
  • The ability to produce logs to a specified logging topic
  • The ability to produce metrics

Language-native functions

"Native" functions are supported in Java and Python, which means a Pulsar Function can have no dependencies.

The benefit of native functions is that they do not have any dependencies beyond what's already available in Java/Python "out of the box." The downside is that they do not provide access to the function context, which is necessary for a variety of functionalities, including logging, user configuration, and more.

Pulsar Functions SDK

To enable a Pulsar Function to access to a context object, you can use Pulsar Functions SDK, available for Java, Python, and Go.

Java

The following is a Java function example that uses information about its context.

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. public class ContextAwareFunction implements Function<String, Void> {
  5. @Override
  6. public Void process(String input, Context, context) {
  7. Logger LOG = context.getLogger();
  8. String functionTenant = context.getTenant();
  9. String functionNamespace = context.getNamespace();
  10. String functionName = context.getFunctionName();
  11. LOG.info("Function tenant/namespace/name: {}/{}/{}", functionTenant, functionNamespace, functionName);
  12. return null;
  13. }
  14. }

Python

The following is a Python function example that uses information about its context.

  1. from pulsar import Function
  2. class ContextAwareFunction(Function):
  3. def process(self, input, context):
  4. log = context.get_logger()
  5. function_tenant = context.get_function_tenant()
  6. function_namespace = context.get_function_namespace()
  7. function_name = context.get_function_name()
  8. log.info("Function tenant/namespace/name: {0}/{1}/{2}".format(function_tenant, function_namespace, function_name))

Go

The following is a Go function example that uses information about its context.

  1. import (
  2. "context"
  3. "fmt"
  4. "github.com/apache/pulsar/pulsar-function-go/log"
  5. "github.com/apache/pulsar/pulsar-function-go/pf"
  6. )
  7. func contextFunc(ctx context.Context) {
  8. if fc, ok := pf.FromContext(ctx); ok {
  9. tenant := fc.GetFuncTenant()
  10. namespace := fc.GetFuncNamespace()
  11. name := fc.GetFuncName()
  12. log.Info("Function tenant/namespace/name: %s/%s/%s\n", tenant, namespace, name)
  13. }
  14. }
  15. func main() {
  16. pf.Start(contextFunc)
  17. }

Deployment

Pulsar Functions support a variety of deployment options. You can deploy a Pulsar Function in the following ways.

Deployment modeDescription
Local run modeThe function runs in your local environment, for example, on your laptop.
Cluster modeThe function runs inside of your Pulsar cluster, on the same machines as your Pulsar brokers.

Local run mode

If you run a Pulsar Function in the local run mode, you run it on the machine where you run commands(for example, your laptop, an AWS EC2 instance). The following example is about the localrun command.

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

By default, the function connects to a Pulsar cluster running on the same machine, via a local broker service URL of pulsar://localhost:6650. If you run a function with the local run mode, and connect it to a non-local Pulsar cluster, specify a different broker URL using the —brokerServiceUrl flag. The following is an example.

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster mode

When you run Pulsar Functions in the cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your local environment. You can run a function in the cluster mode using the create command. The following is an example.

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

This command uploads myfunc.py to Pulsar, which uses the code to start one or more instances of the function.

Run instances in parallel

When you create Pulsar Functions and run in the cluster mode, only one instance of Pulsar Functions is running by default. However, you can run multiple instances in parallel. Specify the number of instances when you create Pulsar Functions, or update an existing single-instance function with a new parallel factor.

This command, for example, creates and runs a function with 5 instances in parallel.

  1. $ bin/pulsar-admin functions create \
  2. --name parallel-fun \
  3. --tenant public \
  4. --namespace default \
  5. --py func.py \
  6. --classname func.ParallelFunction \
  7. --parallelism 5

Function instance resources

When you run Pulsar Functions in the cluster mode, you can specify the resources that are assigned to each function instance.

ResourceSpecified as…Runtimes
CPUThe number of coresDocker (coming soon)
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

The following example allocates 8 cores, 8 GB of RAM, and 10 GB of disk space to a function.

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

For more information on resources, see the Deploying and Managing Pulsar Functions documentation.

Logging

Pulsar Functions created using Pulsar Functions SDK can send logs to a log topic that you specify as part of the function configuration. The function created using the following command produces all logs on the persistent://public/default/my-func-1-log topic.

  1. $ bin/pulsar-admin functions create \
  2. --name my-func-1 \
  3. --log-topic persistent://public/default/my-func-1-log \
  4. # Other configs

The following is an example of Java function that logs at different log levels based on the function input.

  1. public class LoggerFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) {
  4. Logger LOG = context.getLogger();
  5. if (input.length() <= 100) {
  6. LOG.info("This string has a length of {}", input);
  7. } else {
  8. LOG.warn("This string is getting too long! It has {} characters", input);
  9. }
  10. }
  11. }

The following is an example of Go function that logs at different log levels based on the function input.

  1. import (
  2. "context"
  3. "github.com/apache/pulsar/pulsar-function-go/log"
  4. "github.com/apache/pulsar/pulsar-function-go/pf"
  5. )
  6. func loggerFunc(ctx context.Context, input []byte) {
  7. if len(input) <= 100 {
  8. log.Infof("This input has a length of: %d", len(input))
  9. } else {
  10. log.Warnf("This input is getting too long! It has {%d} characters", len(input))
  11. }
  12. }
  13. func main() {
  14. pf.Start(loggerFunc)
  15. }

When you use logTopic related functionalities in Go Function, import github.com/apache/pulsar/pulsar-function-go/log, and you do not have to use the getLogger() context object. The approach is different from Java Function and Python Function.

User configuration

You can pass arbitrary key-values to Pulsar Functions via the command line (both keys and values must be string). This set of key-values is called the functions user configuration. User configuration must consist of JSON strings.

The following example passes user configuration to a function.

  1. $ bin/pulsar-admin functions create \
  2. --user-config '{"key-1":"value-1","key-2","value-2"}' \
  3. # Other configs

The following example accesses that configuration map.

  1. public class ConfigMapFunction implements Function<String, Void> {
  2. @Override
  3. public Void process(String input, Context context) {
  4. String val1 = context.getUserConfigValue("key1").get();
  5. String val2 = context.getUserConfigValue("key2").get();
  6. context.getLogger().info("The user-supplied values are {} and {}", val1, val2);
  7. return null;
  8. }
  9. }

Trigger Pulsar Functions

You can trigger a Pulsar Function running in the cluster mode with the command line. When triggering a Pulsar Function, you can pass a specific value to the Function and get the return value without creating a client. Triggering is useful for, but not limited to, testing and debugging purposes.

NoteTriggering a function is no different from invoking a function by producing a message on one of the function input topics. The pulsar-admin functions trigger command is a convenient mechanism for sending messages to functions without using the pulsar-client tool or a language-specific client library.

The following is an example of Pulsar Functions written in Python (using the native interface) that simply reverses string inputs.

  1. def process(input):
  2. return input[::-1]

If the function is running in a Pulsar cluster, you can trigger it with the following commands.

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name reverse-func \
  5. --trigger-value "snoitcnuf raslup ot emoclew"

And then welcome to Pulsar Functions is displayed in the console output.

NoteInstead of passing a string via the CLI, you can trigger Pulsar Functions with the contents of a file using the —triggerFile flag.

Processing guarantees

Pulsar Functions provide three different messaging semantics that you can apply to any function.

Delivery semanticsDescription
At-most-once deliveryEach message sent to the function is likely to be processed, or not to be processed (hence "at most").
At-least-once deliveryEach message sent to the function can be processed more than once (hence the "at least").
Effectively-once deliveryEach message sent to the function will have one output associated with it.

This command, for example, runs a function in the cluster mode with effectively-once guarantees applied.

  1. $ bin/pulsar-admin functions create \
  2. --name my-effectively-once-function \
  3. --processing-guarantees EFFECTIVELY_ONCE \
  4. # Other function configs

Metrics

Pulsar Functions that use Pulsar Functions SDK can publish metrics to Pulsar. For more information, see Metrics for Pulsar Functions.

State storage

Pulsar Functions use Apache BookKeeper as a state storage interface. Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.