Deploy Pulsar Functions

Requirements

To deploy and manage Pulsar Functions, you need to have a Pulsar cluster running. There are several options for this:

If you run a non-standalone cluster, you need to obtain the service URL for the cluster. How you obtain the service URL depends on how you deploy your Pulsar cluster.

If you want to deploy and trigger Python user-defined functions, you need to install the pulsar python client on all the machines running functions workers.

Command-line interface

Pulsar Functions are deployed and managed using the pulsar-admin functions interface, which contains commands such as create for deploying functions in cluster mode, trigger for triggering functions, list for listing deployed functions.

To learn more commands, refer to pulsar-admin functions.

Default arguments

When managing Pulsar Functions, you need to specify a variety of information about functions, including tenant, namespace, input and output topics, and so on. However, some parameters have default values if you do not specify values for them. The following table lists the default values.

ParameterDefault
Function nameYou can specify any value for the class name (except org, library, or similar class names). For example, when you specify the flag —classname org.example.MyFunction, the function name is MyFunction.
TenantDerived from names of the input topics. If the input topics are under the marketing tenant, which means the topic names have the form persistent://marketing/{namespace}/{topicName}, the tenant is marketing.
NamespaceDerived from names of the input topics. If the input topics are under the asia namespace under the marketing tenant, which means the topic names have the form persistent://marketing/asia/{topicName}, then the namespace is asia.
Output topic{input topic}-{function name}-output. For example, if an input topic name of a function is incoming, and the function name is exclamation, then the name of the output topic is incoming-exclamation-output.
Subscription typeFor at-least-once and at-most-once processing guarantees, the SHARED mode is applied by default; for effectively-once guarantees, the FAILOVER mode is applied.
Processing guaranteesATLEAST_ONCE
Pulsar service URLpulsar://localhost:6650

Example of default arguments

Take the create command as an example.

  1. $ bin/pulsar-admin functions create \
  2. --jar my-pulsar-functions.jar \
  3. --classname org.example.MyFunction \
  4. --inputs my-function-input-topic1,my-function-input-topic2

The function has default values for the function name (MyFunction), tenant (public), namespace (default), subscription type (SHARED), processing guarantees (ATLEAST_ONCE), and Pulsar service URL (pulsar://localhost:6650).

Local run mode

If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an AWS EC2 instance, and so on). The following is a localrun command example.

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

By default, the function connects to a Pulsar cluster running on the same machine, via a local broker service URL of pulsar://localhost:6650. If you use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the --brokerServiceUrl flag. The following is an example.

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster mode

When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your local environment. You can run a function in cluster mode using the create command.

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

Update functions in cluster mode

You can use the update command to update a Pulsar Function running in cluster mode. The following command updates the function created in the cluster mode section.

  1. $ bin/pulsar-admin functions update \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/new-input-topic \
  5. --output persistent://public/default/new-output-topic

Parallelism

Pulsar Functions run as processes or threads, which are called instances. When you run a Pulsar Function, it runs as a single instance by default. With one localrun command, you can only run a single instance of a function. If you want to run multiple instances, you can use localrun command multiple times.

When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the create command.

  1. $ bin/pulsar-admin functions create \
  2. --parallelism 3 \
  3. # Other function info

You can adjust the parallelism of an already created function using the update interface.

  1. $ bin/pulsar-admin functions update \
  2. --parallelism 5 \
  3. # Other function

If you specify a function configuration via YAML, use the parallelism parameter. The following is a config file example.

  1. # function-config.yaml
  2. parallelism: 3
  3. inputs:
  4. - persistent://public/default/input-1
  5. output: persistent://public/default/output-1
  6. # other parameters

The following is corresponding update command.

  1. $ bin/pulsar-admin functions update \
  2. --function-config-file function-config.yaml

Function instance resources

When you run Pulsar Functions in cluster mode, you can specify the resources that are assigned to each function instance.

ResourceSpecified asRuntimes
CPUThe number of coresKubernetes
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

The following function creation command allocates 8 cores, 8 GB of RAM, and 10 GB of disk space to a function.

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

Resources are per instance

The resources that you apply to a given Pulsar Function are applied to each instance of the function. For example, if you apply 8 GB of RAM to a function with a parallelism of 5, you are applying 40 GB of RAM for the function in total. Make sure that you take the parallelism (the number of instances) factor into your resource calculations.

Use Package management service

Package management enables version management and simplifies the upgrade and rollback processes for Functions, Sinks, and Sources. When you use the same function, sink and source in different namespaces, you can upload them to a common package management system.

To use Package management service, ensure that the package management service has been enabled in your cluster by setting the following properties in broker.conf.

Note: Package management service is not enabled by default.

  1. enablePackagesManagement=true
  2. packagesManagementStorageProvider=org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider
  3. packagesReplicas=1
  4. packagesManagementLedgerRootPath=/ledgers

With Package management service enabled, you can upload your function packages by upload a package to the service and get the package URL.

When you have a ready to use package URL, you can create the function with package URL by setting --jar, --py, or --go to the package URL with pulsar-admin functions create.

Trigger Pulsar Functions

If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. Triggering a function means that you send a message with a specific value to the function and get the function output (if any) via the command line.

Triggering a function is to invoke a function by producing a message on one of the input topics. With the pulsar-admin functions trigger command, you can send messages to functions without using the pulsar-client tool or a language-specific client library.

To learn how to trigger a function, you can start with Python function that returns a simple string based on the input.

  1. # myfunc.py
  2. def process(input):
  3. return "This function has been triggered with a value of {0}".format(input)

You can run the function in local run mode.

  1. $ bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --py myfunc.py \
  6. --classname myfunc \
  7. --inputs persistent://public/default/in \
  8. --output persistent://public/default/out

Then assign a consumer to listen on the output topic for messages from the myfunc function with the pulsar-client consume command.

  1. $ bin/pulsar-client consume persistent://public/default/out \
  2. --subscription-name my-subscription
  3. --num-messages 0 # Listen indefinitely

And then you can trigger the function.

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --trigger-value "hello world"

The consumer listening on the output topic produces something as follows in the log.

  1. ----- got message -----
  2. This function has been triggered with a value of hello world

Topic info is not required

In the trigger command, you only need to specify basic information about the function (tenant, namespace, and name). To trigger the function, you do not need to know the function input topics.