Package Python Functions
Python functions support the following three packaging formats:
One Python file
To package a Python function into one Python file, complete the following steps.
Write a Python function.
from pulsar import Function # import the Function module from Pulsar
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
return input + '!'
In this example, when you write a Python function, you need to inherit the Function class and implement the
process()
method.process()
mainly has two parameters:input
represents your input.context
represents an interface exposed by the Pulsar Function. You can get the attributes in the Python function based on the provided context object.
Install a Python client. The implementation of a Python function depends on the Python client.
pip install pulsar-client==2.10.0
And install protobuf tools to generate the proto files:
pip install 'protobuf==3.20.*'
Copy the Python function file to the Pulsar image.
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of Python function file> CONTAINER ID:/pulsar
Run the Python function using the following command.
./bin/pulsar-admin functions localrun \
--classname <Python Function file name>.<Python Function class name> \
--py <absolute path of Python Function file> \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name PythonFunction
The following log indicates that the Python function starts successfully.
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
ZIP file
To package a Python function into a ZIP file, complete the following steps.
Prepare the ZIP file.
Assuming the zip file is named as `func.zip`, unzip the `func.zip` folder:
"func/src"
"func/requirements.txt"
"func/deps"
Take the exclamation.zip file as an example. The internal structure of the example is as follows.
.
├── deps
│ └── sh-1.12.14-py2.py3-none-any.whl
└── src
└── exclamation.py
Copy the ZIP file to the Pulsar image.
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of ZIP file> CONTAINER ID:/pulsar
Run the Python function using the following command.
./bin/pulsar-admin functions localrun \
--classname exclamation \
--py <absolute path of ZIP file> \
--inputs persistent://public/default/in-topic \
--output persistent://public/default/out-topic \
--tenant public \
--namespace default \
--name PythonFunction
The following log indicates that the Python function starts successfully.
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
PIP
note
The PIP method is only supported in Kubernetes runtime.
To package a Python function with PIP, complete the following steps.
Configure the
functions_worker.yml
file.#### Kubernetes Runtime ####
installUserCodeDependencies: true
Write your Python Function.
from pulsar import Function
import js2xml
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
# add your logic
return input + '!'
You can introduce additional dependencies. When Python functions detect that the file currently used is
whl
and theinstallUserCodeDependencies
parameter is specified, the system uses thepip install
command to install the dependencies required in Python functions.Generate the
whl
file.cd $PULSAR_HOME/pulsar-functions/scripts/python
chmod +x generate.sh
./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
# e.g: ./generate.sh /path/to/python /path/to/python/output 1.0.0
The output is written in
/path/to/python/output
:-rw-r--r-- 1 root staff 1.8K 8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
-rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
-rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl