本教程将引导你在计算机上运行 独立 Pulsar 集群并使用此集群运行你的第一个 Pulsar Function。 The first function will run in local run mode (outside your Pulsar cluster), while the second will run in cluster mode (inside your cluster).
在本地模式下,Pulsar Function会与 Pulsar 集群通信,但在集群之外运行。
先决条件
为了遵循此教程,你需要在机器上安装 Maven。
运行 Pulsar 独立集群
为了运行 Pulsar Functions,我们需要先运行本地 Pulsar 集群。 简单的方式就是用 standalone 模式运行 Pulsar。 根据下面的步骤启动一个独立集群:
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz
$ tar xvfz apache-pulsar-2.6.1-bin.tar.gz
$ cd apache-pulsar-2.6.1
$ bin/pulsar standalone \
--advertised-address 127.0.0.1
在独立模式下运行 Pulsar 会自动创建public
租户和default
命名空间。 That tenant and namespace will be used throughout this tutorial.
在本地模式下运行 Pulsar Function
让我们从一个简单的函数开始,该函数将字符串作为Pulsar主题的输入,在字符串的末尾添加一个感叹号,然后将该新字符串发布到另一个Pulsar主题。 这是函数的代码:
public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}
包含此函数和其他几个函数(用Java写的) 的 JAR 文件包含在你下载的二进制发行版里(在examples
文件夹下)。 To run the function in local mode, i.e. on our laptop but outside our Pulsar cluster:
$ bin/pulsar-admin functions localrun \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation
允许多个输入主题
In the example above, a single topic was specified using the
--inputs
flag. You can also specify multiple input topics as a comma-separated list using the same flag. Here’s an example:
--inputs topic1,topic2
We can open up another shell and use the [`pulsar-client`](reference-cli-tools.md#pulsar-client) tool to listen for messages on the output topic:
```bash
$ bin/pulsar-client consume persistent://public/default/exclamation-output \
--subscription-name my-subscription \
--num-messages 0
Setting the
--num-messages
flag to 0 means that the consumer will listen on the topic indefinitely (rather than only accepting a certain number of messages).
With a listener up and running, we can open up another shell and produce a message on the input topic that we specified:
$ bin/pulsar-client produce persistent://public/default/exclamation-input \
--num-produce 1 \
--messages "Hello world"
In the output, you should see the following:
----- got message -----
Hello world!
Success! As you can see, the message has been successfully processed by the exclamation function. To shut down the function, simply hit Ctrl+C.
Here’s what happened:
- The
Hello world
message that we published to the input topic (persistent://public/default/exclamation-input
) was passed to the exclamation function that we ran on our machine - The exclamation function processed the message (providing a result of
Hello world!
) and published the result to the output topic (persistent://public/default/exclamation-output
). - If our exclamation function hadn’t been running, Pulsar would have durably stored the message data published to the input topic in Apache BookKeeper until a consumer consumed and acknowledged the message
Run a Pulsar Function in cluster mode
Local run mode is useful for development and experimentation, but if you want to use Pulsar Functions in a real Pulsar deployment, you’ll want to run them in cluster mode. In this mode, Pulsar Functions run inside your Pulsar cluster and are managed using the same pulsar-admin functions
interface that we’ve been using thus far.
This command, for example, would deploy the same exclamation function we ran locally above in our Pulsar cluster (rather than outside it):
$ bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation
You should see Created successfully
in the output. Now, let’s see a list of functions running in our cluster:
$ bin/pulsar-admin functions list \
--tenant public \
--namespace default
We should see just the exclamation
function listed there. We can also check the status of our deployed function using the getstatus
command:
$ bin/pulsar-admin functions getstatus \
--tenant public \
--namespace default \
--name exclamation
You should see this JSON output:
{
"functionStatusList": [
{
"running": true,
"instanceId": "0"
}
]
}
As we can see, (a) the instance is currently running and (b) there is one instance, with an ID of 0, running. We can get other information about the function (topics, tenant, namespace, etc.) using the get
command instead of getstatus
:
$ bin/pulsar-admin functions get \
--tenant public \
--namespace default \
--name exclamation
You should see this JSON output:
{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"output": "persistent://public/default/exclamation-output",
"autoAck": true,
"inputs": [
"persistent://public/default/exclamation-input"
],
"parallelism": 1
}
As we can see, the parallelism of the function is 1, meaning that only one instance of the function is running in our cluster. Let’s update our function to a parallelism of 3 using the update
command:
$ bin/pulsar-admin functions update \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--tenant public \
--namespace default \
--name exclamation \
--parallelism 3
You should see Updated successfully
in the output. If you run the get
command from above for the function, you can see that the parallelism has increased to 3, meaning that there are now three instances of the function running in our cluster:
{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"output": "persistent://public/default/exclamation-output",
"autoAck": true,
"inputs": [
"persistent://public/default/exclamation-input"
],
"parallelism": 3
}
Finally, we can shut down our running function using the delete
command:
$ bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name exclamation
If you see Deleted successfully
in the output, then you’ve succesfully run, updated, and shut down a Pulsar Function running in cluster mode. Congrats! Now, let’s go even further and run a brand new function in the next section.
Writing and running a new function
In order to write and run the Python function below, you’ll need to install a few dependencies:
$ pip install pulsar-client
In the above examples, we ran and managed a pre-written Pulsar Function and saw how it worked. To really get our hands dirty, let's write and our own function from scratch, using the Python API. This simple function will also take a string as input but it will reverse the string and publish the resulting, reversed string to the specified topic.
First, create a new Python file:
```bash
$ touch reverse.py
In that file, add the following:
def process(input):
return input[::-1]
Here, the process
method defines the processing logic of the Pulsar Function. It simply uses some Python slice magic to reverse each incoming string. Now, we can deploy the function using create
:
$ bin/pulsar-admin functions create \
--py reverse.py \
--classname reverse \
--inputs persistent://public/default/backwards \
--output persistent://public/default/forwards \
--tenant public \
--namespace default \
--name reverse
If you see Created successfully
, the function is ready to accept incoming messages. Because the function is running in cluster mode, we can trigger the function using the trigger
command. This command will send a message that we specify to the function and also give us the function’s output. Here’s an example:
$ bin/pulsar-admin functions trigger \
--name reverse \
--tenant public \
--namespace default \
--trigger-value "sdrawrof won si tub sdrawkcab saw gnirts sihT"
You should get this output:
This string was backwards but is now forwards
Once again, success! We created a brand new Pulsar Function, deployed it in our Pulsar standalone cluster in cluster mode and successfully triggered the function. If you’re ready for more, check out one of these docs:
Packaging python dependencies
For python functions requiring dependencies to be deployable in pulsar worker instances in an offline manner, we need to package the artifacts as below.
Client Requirements
Following programs are required to be installed on the client machine
pip \\ rquired for getting python dependencies
zip \\ for building zip archives
Python Dependencies
A file named requirements.txt is needed with required dependencies for the python function
sh==1.12.14
Prepare the pulsar function in folder called src.
Run the following command to gather the python dependencies in the folder caller deps
pip download \
--only-binary :all: \
--platform manylinux1_x86_64 \
--python-version 27 \
--implementation cp \
--abi cp27m -r requirements.txt -d deps
Sample ouptut
Collecting sh==1.12.14 (from -r requirements.txt (line 1))
Using cached https://files.pythonhosted.org/packages/4a/22/17b22ef5b049f12080f5815c41bf94de3c229217609e469001a8f80c1b3d/sh-1.12.14-py2.py3-none-any.whl
Saved ./deps/sh-1.12.14-py2.py3-none-any.whl
Successfully downloaded sh
Note pulsar-client is not needed as a dependency as it already installed in the worker node.
Packaging
Create a destination folder with the desired pacaking name eg : exclamation, copy src and deps folder into it and finally compress the folder into a zip archive.
Sample sequence
cp -R deps exclamation/
cp -R src exclamation/
ls -la exclamation/
total 7
drwxr-xr-x 5 a.ahmed staff 160 Nov 6 17:51 .
drwxr-xr-x 12 a.ahmed staff 384 Nov 6 17:52 ..
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 deps
drwxr-xr-x 3 a.ahmed staff 96 Nov 6 17:51 src
zip -r exclamation.zip exclamation
Archive exclamation.zip can we deployed as function into a pulsar worker, the worker does not need internet connectivity to download packages as they are all included in the zip file.