要求

需要通过运行 Pulsar 集群来部署和管理 Pulsar Functions。 There are several options for this:

运行一个非 standalone 集群,需要获取集群服务的 URL。 如何获取集群服务的 URL 取决于如何部署 Pulsar 集群。

想要部署并触发 Python 的用户自定义 function,需要在所有运行 functions workers 的设备上安装 the pulsar python client

Command-line interface

Pulsar Functions 使用 pulsar-admin functions 接口进行部署和管理,通过 createcluster mode 下部署 functions;通过 trigger 使用 triggering functions,通过 list 列出已部署的 functions。

了解更多命令,请参阅 pulsar-admin functions

Default arguments

在管理 Pulsar Functions 时,需要指定关于 functions 的各种信息,包括租户、命名空间、输入主题、输出主题等。 但是,在不输入信息时,有些参数会使用默认值。 如下表所示。

Parameter默认值
Function name可以指定类名称(除了org、library 或相似的名称)。 例如,当指定标志为 —classname org.example.MyFunction 时,function 名称为 MyFunction
Tenant来自输入 topic 的名称。 如果输入 topic 在 marketing 租户下,即 topic 名称形式为 persistent://marketing/{namespace}/{topicName},则租户为 marketing
Namespace来自输入 topic 的名称。 如果输入 topic 在 asia 命名空间中,在 marketing 租户下,即主题名称形式为 persistent://marketing/asia/{topicName},则命名空间为 asia
Output topic{input topic}-{function name}-output. 例如,一个 function 的输入 topic 名称是 incoming,function 名称为 exclamation,则输出 topic 名称为 incoming-exclamation-output
Subscription type对于 at-least-onceat-most-once 处理保证,默认使用 SHARED 模式;对于 effectively-once 保证,默认使用 FAILOVER 模式。
Processing guaranteesATLEAST_ONCE
Pulsar service URLpulsar://localhost:6650

Example of default arguments

create 命令为例。

  1. $ bin/pulsar-admin functions create \
  2. --jar my-pulsar-functions.jar \
  3. --classname org.example.MyFunction \
  4. --inputs my-function-input-topic1,my-function-input-topic2

此 function 具有默认值的参数包括:function 名称(MyFunction)、租户(public)、命名空间(default)、订阅类型(SHARED)、处理保证(ATLEAST_ONCE)、Pulsar 服务 URL (pulsar://localhost:6650)。

Local run mode

If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an AWS EC2 instance, and so on). 本地运行命令示例如下。

  1. $ bin/pulsar-admin functions localrun \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

默认情况下,该 function 通过 broker 服务的 URL pulsar://localhost:6650 连接到在同一设备上运行的 Pulsar 集群。 如果想要在本地运行模式下运行一个 function ,并连接到非本地 Pulsar 集群,则可以通过 --brokerServiceUrl 标志来指定不同的 broker URL。 The following is an example.

  1. $ bin/pulsar-admin functions localrun \
  2. --broker-service-url pulsar://my-cluster-host:6650 \
  3. # Other function parameters

Cluster mode

When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your local environment. 您可以使用 create 命令在群集模式下运行函数。

  1. $ bin/pulsar-admin functions create \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/input-1 \
  5. --output persistent://public/default/output-1

在集群模式下更新 function

You can use the update command to update a Pulsar Function running in cluster mode. 以下命令用于更新在集群模式下创建的 function。

  1. $ bin/pulsar-admin functions update \
  2. --py myfunc.py \
  3. --classname myfunc.SomeFunction \
  4. --inputs persistent://public/default/new-input-topic \
  5. --output persistent://public/default/new-output-topic

Parallelism

Pulsar Functions run as processes or threads, which are called instances. 在运行 Pulsar Function 时,默认为单个实例。 使用一个本地运行命令只能运行 function 的单个实例。 想要运行多个实例则需要多次使用本地运行命令。

When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the create command.

  1. $ bin/pulsar-admin functions create \
  2. --parallelism 3 \
  3. # Other function info

You can adjust the parallelism of an already created function using the update interface.

  1. $ bin/pulsar-admin functions update \
  2. --parallelism 5 \
  3. # Other function

通过 YAML ,使用 parallelism function 指定其配置。 配置示例如下。

  1. # function-config.yaml
  2. parallelism: 3
  3. inputs:
  4. - persistent://public/default/input-1
  5. output: persistent://public/default/output-1
  6. # other parameters

相关更新命令如下。

  1. $ bin/pulsar-admin functions update \
  2. --function-config-file function-config.yaml

Function instance resources

集群模式下运行 Pulsar Functions 时,可以指定资源分配给 function 的每个实例

ResourceSpecified asRuntimes
CPUThe number of coresKubernetes
RAMThe number of bytesProcess, Docker
Disk spaceThe number of bytesDocker

为一个 function 分配 8 个内核、8GB 内存、10GB 磁盘空间的 function 创建命令如下。

  1. $ bin/pulsar-admin functions create \
  2. --jar target/my-functions.jar \
  3. --classname org.example.functions.MyFunction \
  4. --cpu 8 \
  5. --ram 8589934592 \
  6. --disk 10737418240

Resources are per instance

应用于给定 Pulsar Function 的资源适用于此 function 的每个实例。 例如,对一个并行度为 5 的 function 使用 8GB 内存,则此 function 在应用的总内存为 40GB。 在资源计算时要考虑并行度(实例数量)。

Trigger Pulsar Functions

If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. 触发 function 意味着向其发送具有特定值的消息,并通过命令行获取其输出(如有输入)。

触发 function,即通过在某个输入主题上生成的消息来调用 function。 通过 pulsar-admin functions trigger 命令,可以在不使用 pulsar-client 工具或特定语言客户端库的条件下向 function 发送消息。

要学习如何触发 function,可以从 Python function 开始,Python function 会返回基于输入的简单字符串。

  1. # myfunc.py
  2. def process(input):
  3. return "This function has been triggered with a value of {0}".format(input)

可以在本地运行模式下运行 function。

  1. $ bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --py myfunc.py \
  6. --classname myfunc \
  7. --inputs persistent://public/default/in \
  8. --output persistent://public/default/out

指定 consumer 以 pulsar-client consume 命令在输出 topic 上接收来自 myfunc function 的消息。

  1. $ bin/pulsar-client consume persistent://public/default/out \
  2. --subscription-name my-subscription
  3. --num-messages 0 # Listen indefinitely

然后可以触发 function。

  1. $ bin/pulsar-admin functions trigger \
  2. --tenant public \
  3. --namespace default \
  4. --name myfunc \
  5. --trigger-value "hello world"

监听输出 topic 的 consumer 会在日志中生成如下内容。

  1. ----- got message -----
  2. This function has been triggered with a value of hello world

主题信息非必需

trigger 命令中,只需指定 function 的基本信息(租户、命名空间、名称)。 触发 function 前无需了解其输入 topic。