Deploy and manage functions worker
在使用 Pulsar Functions 前,需要学习如何设置 Pulsar Functions worker,以及如何配置 Functions 运行时。
Pulsar functions-worker
is a logic component to run Pulsar Functions in cluster mode. 有两种不同的选择,你可以选择一种你需要的方式:
- 与 broker 一起运行
- 在不同的 broker 中单独运行
Note
The--- Service Urls---
lines in the following diagrams represent Pulsar service URLs that Pulsar client and admin use to connect to a Pulsar cluster.
与 brokers 一起运行 Functions-worker
The following diagram illustrates the deployment of functions-workers running along with brokers.
To enable functions-worker running as part of a broker, you need to set functionsWorkerEnabled
to true
in the broker.conf
file.
functionsWorkerEnabled=true
如果functionsWorkerEnabled
设置为 true
,Functions-worker 会作为 broker 的一部分运行。 You need to configure the conf/functions_worker.yml
file to customize your functions_worker.
在与 broker 一起运行 Functions-worker 时,需要先配置 Functions-worker,再与 broker 一起启动。
配置 Functions-Worker 以与 brokers 一起运行
在这个模式下,让 functions-worker
在 broker 上运行,其大多数配置已经继承了 broker 的配置(如配置存储设置,权限配置等等)。
Pay attention to the following required settings when configuring functions-worker in this mode.
numFunctionPackageReplicas
:存储 function 包的副本数。 默认值是1
,对独立部署很有用。 对于生产环境部署,为确保其高可用性,需设置为大于2
。initializedDlogMetadata
: Whether to initialize distributed log metadata in runtime. 如果设置为true
,需要确保通过bin/pulsar initialize-cluster-metadata
命令对其进行了初始化。
If authentication is enabled on the BookKeeper cluster, configure the following BookKeeper authentication settings.
bookkeeperClientAuthenticationPlugin
:BookKeeper 客户端身份验证插件的名称。bookkeeperClientAuthenticationParametersName
:BookKeeper 客户端身份验证插件的参数名称。bookkeeperClientAuthenticationParameters
:BookKeeper 客户端身份验证插件的参数。
配置和 broker 共同运行的 Stateful-Functions
如果想使用 Stateful-Functions 相关的函数(例如,putState()
和 queryState()
相关的接口),参考以下步骤。
Enable the streamStorage service in the BookKeeper.
现在服务使用的是 NAR 包,所以需要在
bookkeeper.conf
中进行配置。extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
在启动 bookie 后,使用以下方法检查 streamStorage 服务是否正常启动。
输入:
telnet localhost 4181
输出:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
在
functions_worker.yml
中打开此功能。
```text
stateStorageServiceUrl: bk://<bk-service-url>:4181
```
`bk-service-url` is the service URL pointing to the BookKeeper table service.
同时运行 Functions-worker 和 broker
Once you have configured the functions_worker.yml
file, you can start or restart your broker.
And then you can use the following command to verify if functions-worker
is running well.
curl <broker-ip>:8080/admin/v2/worker/cluster
After entering the command above, a list of active function workers in the cluster is returned. 会输出类似以下的内容:
[{"workerId":"<worker-id>","workerHostname":"<worker-hostname>","port":8080}]
单独运行 Functions-worker
This section illustrates how to run functions-worker
as a separate process in separate machines.
Note
In this mode, make surefunctionsWorkerEnabled
is set tofalse
, so you won’t startfunctions-worker
with brokers by mistake. 当想通过访问functions-worker
来管理 functions 时,pulsar-admin
客户端工具或者其他客户端应该用workerHostname
和workerPort
这些配置在 Worker parameters 中的参数来生成一个--admin-url
。
配置 Functions-Worker 以单独运行
To run function-worker separately, you have to configure the following parameters.
Worker 参数
workerId
:类型为字符串。 它是整个集群是唯一的,用于标识每台 worker 机器workerHostname
:worker 计算机的主机名。workerPort
:worker 服务器的监听端口。 在未进行自定义时,请使用其默认值。workerPortTls
:worker 服务器监听的 TLS 端口。 在未进行自定义时,请使用其默认值。
Function 包参数
numFunctionPackageReplicas
:存储 function 包的副本数。 默认值为1
。
Function 元数据参数
pulsarServiceUrl
:broker 集群的 Pulsar 服务 URL。pulsarWebServiceUrl
: The Pulsar web service URL for your broker cluster.pulsarFunctionsCluster
:设置 Pulsar 集群名称 (与clusterName
在 broker 配置中的设置相同)。
If authentication is enabled for your broker cluster, you should configure the authentication plugin and parameters for the functions worker to communicate with the brokers.
clientAuthenticationPlugin
clientAuthenticationParameters
安全设置
If you want to enable security on functions workers, you should:
Enable TLS transport encryption
To enable TLS transport encryption, configure the following settings.
useTLS: true
pulsarServiceUrl: pulsar+ssl://localhost:6651/
pulsarWebServiceUrl: https://localhost:8443
tlsEnabled: true
tlsCertificateFilePath: /path/to/functions-worker.cert.pem
tlsKeyFilePath: /path/to/functions-worker.key-pk8.pem
tlsTrustCertsFilePath: /path/to/ca.cert.pem
// Pulsar 客户端用于和 Pulsar broker 可靠通信所需的证书的存放路径。
brokerClientTrustCertsFilePath: /path/to/ca.cert.pem
For details on TLS encryption, refer to Transport Encryption using TLS.
启用身份验证提供程序
要在函数 worker 启用身份验证,你需要配置以下信息。
Note
Substitute the providers list with the providers you want to enable.
authenticationEnabled: true
authenticationProviders: [ provider1, provider2 ]
For TLS Authentication provider, follow the example below to add the necessary settings. 查看 TLS 认证 可以了解到详细的信息。
brokerClientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls
brokerClientAuthenticationParameters: tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/admin.key-pk8.pem
authenticationEnabled: true
authenticationProviders: ['org.apache.pulsar.broker.authentication.AuthenticationProviderTls']
For SASL Authentication provider, add saslJaasClientAllowedIds
and saslJaasBrokerSectionName
under properties
if needed.
properties:
saslJaasClientAllowedIds: .*pulsar.*
saslJaasBrokerSectionName: Broker
For Token Authentication provider, add necessary settings for properties
if needed. 更多详细信息,请参阅 Token Authentication。 注意:秘钥文件必须是 DER 编码
properties:
tokenSecretKey: file://my/secret.key
# If using public/private
# tokenPublicKey: file:///path/to/public.key
启用授权提供程序
如果需要启用函数 Worker 授权机制,你必须配置authorizationEnabled
,authorizationProvider
和configurationStoreServers
。 The authentication provider connects to configurationStoreServers
to receive namespace policies.
authorizationEnabled: true
authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
configurationStoreServers: <configuration-store-servers>
You should also configure a list of superuser roles. The superuser roles are able to access any admin API. The following is a configuration example.
superUserRoles:
- role1
- role2
- role3
启用端到端加密
你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。
要启用 Functions Worker 端到端加密,可以在命令行使用 --producer-config
进行配置,更多信息可以参考 here。
将 CryptoConfig
的相关配置信息包含到 ProducerConfig
中。 关于 CryptoConfig
的具体可配置字段信息如下:
public class CryptoConfig {
private String cryptoKeyReaderClassName;
private Map cryptoKeyReaderConfig;
private String[] encryptionKeys;
private ProducerCryptoFailureAction producerCryptoFailureAction;
private ConsumerCryptoFailureAction consumerCryptoFailureAction;
}
producerCryptoFailureAction
: 在生产者加密数据失败时,执行FAIL
,SEND
其中之一的操作。consumerCryptoFailureAction
: 在消费者解密数据失败时,执行FAIL
,DISCARD
,CONSME
其中之一的操作。
BookKeeper 身份验证
如果要开启 BooKeeper 集群的身份认证,你必须配置以下 Bookeeper 认证选项:
bookkeeperClientAuthenticationPlugin
:BookKeeper 客户端身份验证插件的名称。bookkeeperClientAuthenticationParametersName
:BookKeeper 客户端身份验证插件的参数名称。bookkeeperClientAuthenticationParameters
:BookKeeper 客户端身份验证插件的参数。
启动 Functions-worker
一旦配置完 functions_worker.yml
文件,可以后台启动 functions-worker
,使用 nohup 命令,结合 pulsar-daemon 客户端工具进行:
bin/pulsar-daemon start functions-worker
也可以前台启动 functions-worker
,结合 pulsar
客户端工具进行:
bin/pulsar functions-worker
为 Functions-workers 配置 Proxies
When you are running functions-worker
in a separate cluster, the admin rest endpoints are split into two clusters. functions
, function-worker
, source
and sink
endpoints are now served by the functions-worker
cluster, while all the other remaining endpoints are served by the broker cluster. Hence you need to configure your pulsar-admin
to use the right service URL accordingly.
In order to address this inconvenience, you can start a proxy cluster for routing the admin rest requests accordingly. Hence you will have one central entry point for your admin service.
If you already have a proxy cluster, continue reading. If you haven’t setup a proxy cluster before, you can follow the instructions to start proxies.
To enable routing functions related admin requests to functions-worker
in a proxy, you can edit the proxy.conf
file to modify the following settings:
functionWorkerWebServiceURL=<pulsar-functions-worker-web-service-url>
functionWorkerWebServiceURLTLS=<pulsar-functions-worker-web-service-url>
对比与 Broker 一起运行和单独运行
As described above, you can run Function-worker with brokers, or run it separately. And it is more convenient to run functions-workers along with brokers. However, running functions-workers in a separate cluster provides better resource isolation for running functions in Process
or Thread
mode.
Use which mode for your cases, refer to the following guidelines to determine.
Use the Run-with-Broker
mode in the following cases:
- a)在
Process
或Thread
模式下运行 functions,则不需要进行资源隔离; - b)在 Kubernetes 上配置 functions-worker 以运行 fucntions(Kubernetes 解决了资源隔离问题)。
Use the Run-separately
mode in the following cases:
- a) 没有 Kubernetes 集群;
- b) 不想单独运行 functions 或 brokers。
故障排除
Error message: Namespace missing local cluster name in clusters list
Failed to get partitioned topic metadata: org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: Namespace missing local cluster name in clusters list: local_cluster=xyz ns=public/functions clusters=[standalone]
The error message prompts when either of the cases occurs:
- a) broker 是以
functionsWorkerEnabled=true
开始的,但是未在conf/functions_worker.yaml
文件中将pulsarFunctionsCluster
设置为正确的集群; - b) 当一个集群中的 brokers 运行良好,而另一个集群中的 brokers 运行有问题时,用
functionsWorkerEnabled=true
建立一个 Pulsar 集群的跨机房副本。
Workaround
If any of these cases happens, follow the instructions below to fix the problem:
Disable Functions Worker by setting
functionsWorkerEnabled=false
, and restart brokers.获取
public/functions
命名空间的当前集群列表。
bin/pulsar-admin namespaces get-clusters public/functions
- 检查集群是否在集群列表中。 如果集群不在列表中,则将其添加到列表中,并更新列表。
bin/pulsar-admin namespaces set-clusters --clusters <existing-clusters>,<new-cluster> public/functions
After setting the cluster successfully, enable functions worker by setting
functionsWorkerEnabled=true
.Set the correct cluster name in
pulsarFunctionsCluster
in theconf/functions_worker.yml
file, and restart brokers.