Configure Functions runtime
可以使用下列方式去运行函数。
- Thread: Invoke functions threads in functions worker.
- Process: Invoke functions in processes forked by functions worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
Note
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
线程和进程两种模式有如下不同:
- 线程模式:当函数运行在线程模式时,函数和 Puslsar functions 的 work 是运行在同一个Java 虚拟机(JVM) 里面的。
- 进程模式: 当函数运行在进程模式时,它们是运行在同一台机器上的不同 Java 虚拟机里面。
配置线程模式运行
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
threadGroupName: "Your Function Container Group"
Thread runtime is only supported in Java function.
配置进程模式运行
When you enable Process runtime, you do not need to configure anything.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
# 存储函数日志的目录
logDirectory:
# 如果想把 java 实例的 jar 包放在其他位置执行,可以在这配置 jar 包存放位置
javaInstanceJarLocation:
# 仅将 python 实例 jar 放在不同位置时才更改 python 实例位置
pythonInstanceLocation:
# 更改额外的依赖项位置:
extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
配置 Kubernetes 运行
当函数 worker 生成 Kubernetes manifests,应用这份 manifests 时,Kubernetes 就会开始工作。 如果你将函数运行在 Kubernetes 里面,你能够使用serviceAccount
去关联正在运行该函数的 Pod。 然后,可以将其配置为与 Kubernetes 集群进行通信。
Mainifests 由函数 worker 生成,包含一个StatefulSet
,一个Service
(用于 pods 之间通信),和一个Secret
(在需要的情况下,用于身份认证)。 默认情况下,StatefulSet
只有一个 Pod,函数的 “并行度” 决定它的数量。 Pode 启动时, Pod 会下载函数的运行内容 (通过函数 worker 的REST API 下载)。 Pod 的容器镜像是可以配置的,但是必须先有函数运行时。
Kubernetes 运行时是支持 secrets 的,所以你能够创建一个 Kubernetes secret,并将其作为环境变量在 Pod 内可见。 Kubernetes 运行时是可扩展的,你可以实现类并定制,比如:如何生成 Kubernetes manifests、如果 传递认证数据给pod,如何整合 secrets等。
提示
For the rules of translating Pulsar object names into Kubernetes resource labels, see here.
基础配置
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory
in the functions_worker.yaml
file. The following is an example.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
# 访问 kubernetes 集群的 uri,为空的话将使用函数 worker 中的 kubernetes 相关配置。
k8Uri:
# 运行函数实例的 kubernetes 命名空间。 不设置的话,默认值是 `default`
jobNamespace:
# 运行函数实例的 Kubernetes pod 名。 不设置的话,默认值是
# `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>`
jobName:
# 运行函数实例的 docker 镜像。 默认情况下是`apachepulsar/pulsar`
pulsarDockerImageName:
# docker 镜像根据用户提供的不同配置运行函数实例。
# 默认是 `apachepulsar/pulsar`
# 例如:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# 运行函数实例的镜像拉取策略。 默认是 `IfNotPresent`
imagePullPolicy: IfNotPresent
# 在 `pulsarDockerImageName` 中的 pulsar 主目录的根目录 by default it is `/pulsar`.
# 如果使用自主构建的镜像,需要定义 `pulsarDockerImageName`
pulsarRootDir:
# config admin CLI 允许用户自定义 admin cli 工具的配置,例如:
# `/bin/pulsar-admin and /bin/pulsarctl`. 默认是 `/bin/pulsar-admin`。 如果想使用 `pulsarctl`
# 需要相应的进行如下配置:
configAdminCLI:
# 如果 `k8Uri` 设置为 null 时,才会生效。 if your function worker is running as a k8 pod,
# setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# is running. setting this to false if your function worker is not running as a k8 pod.
submittingInsidePod: false
# 设置提供 pulsar 服务的 url,让 pulsar 函数能够连接到 pulsar
# 如果没设置,将使用在 worker 里配置的提供 pulsar 服务的 url
pulsarServiceUrl:
# 设置提供 pulsar admin 服务的 url,让 pulsar 函数能够连接到 pulsar
# 如果没设置,将使用在 worker 里配置的提供 pulsar admin 服务的 url
pulsarAdminUrl:
# 用于标记是否安装用户代码依赖。 (应用于 python 包)
installUserCodeDependencies:
# pulsar 函数用来下载 python 依赖包的存储库
pythonDependencyRepository:
# pulsar 函数用来下载 python 额外的依赖包的存储库
pythonExtraDependencyRepository:
# 函数 worker 用来为 pod 选择节点的自定义标签
customLabels:
# 预期的指标收集间隔,以秒为单位
expectedMetricsCollectionInterval: 30
# Kubernetes Runtime 将定期检查此 configMap(如果已定义),并且如果 kubernetes 中特定内容有任何更改,将应用这些更改
changeConfigMap:
# 用于存储改动的 configMap 的命名空间
changeConfigMapNamespace:
# 为 function/source/sink 设置 cpu 使用率和 cpu 限制
# cpu 请求的公式是 cpuRequest = userRequestCpu / cpuOverCommitRatio
cpuOverCommitRatio: 1.0
# 为 function/source/sink 设置的内存请求率和内存大小限制
# 内存请求的公式是 memoryRequest = userRequestMemory / memoryOverCommitRatio
memoryOverCommitRatio: 1.0
# 函数 pod 内部的端口,worker 使用它与 pod 进行通信
grpcPort: 9093
# 函数 pod 内的端口,在其上暴露 prometheus 指标
metricsPort: 9094
# 函数 pod 中的 nar 包将被提取的目录
narExtractionDirectory:
# 存储函数实例文件的 classpath
functionInstanceClassPath:
# 用于删除额外的函数依赖项的目录
# 如果不是绝对路径,那就是相对于 `pulsarRootDir` 的路径
extraFunctionDependenciesDir:
# 在每个实例的函数请求的内存之上,添加了额外的内存补充
percentMemoryPadding: 10
如果函数 worker 运行在 Kubernetes 环境的 broker 中,你可以使用默认配置。
Kubernetes 运行独立的函数 worker
如果你想在 Kubernetes 上运行独立的函数 worker (即不嵌入在broker上运行),你必须在函数 worker 上配置pulsarSerivceUrl
和pulsarAdminUrl
,来指定 broker 的地址。
例如,Pulsar brokers 和 函数 worker 都运行在 K8S的 pulsar
命名空间下。 Broker 有一个叫做 brokers
的服务,函数 worker 有一个叫做 func-worker
的服务。 他们的设置如下:
pulsarServiceUrl: pulsar://broker.pulsar:6650 // 如果使用了 TLS 就用 :pulsar+ssl://broker.pulsar:6651
pulsarAdminUrl: http://func-worker.pulsar:8080 // 如果使用了TLS 就用: https://func-worker:8443
Kubernetes 集群中运行 RBAC
如果要在集群中运行 RBAC,需要确保运行函数worker(如果函数运行在 broker 上,那就是 broker )的 service account 有以下 Kubernetes API 的访问权限。
- services
- configmaps
- pods
- apps.statefulsets
如下是完整的配置:
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: functions-worker
rules:
- apiGroups: [""]
resources:
- services
- configmaps
- pods
verbs:
- '*'
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: functions-worker
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: functions-worker
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: functions-worker
subjectsKubernetesSec:
- kind: ServiceAccount
name: functions-worker
如果 service account 的配置不正确,就会有如下的错误信息提示。
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]
集成 Kubernetes secrets
为了确保信息安全 ,Pulsar 函数可以引用Kubernetes secrets。 如果要启用此功能,请将参数secretsProviderConfiguratorClassName
设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
。
你可以在部署函数的 kubernetes 命名空间中创建一个 secret。 例如,你的函数部署在 Kubernetes 的命名空间 pulsar-func
中,你有一个名为database-creds
的secret,它有一个字段password
,你希望将其作为一个环境变量传递到 pod 中,并且想用名称DATABASE_PASSWORD
获取到它。 那么下面的函数配置允许你将这个 secret 以环境变量的形式传递到 pod 里面。
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"
secrets:
# secret 中的`password`和`database-creds`将被挂载到名为 `DATABASE_PASSWORD `的环境变量中。
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
启用 Token 认证
当你开启 Pulsar 集群的身份认证时,你需要为运行函数的 pod 提供一种机制,以通过 broker的身份认证。
org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
接口可以为任何认证机制提供支持。 配置文件function-worker.yml
的配置项functionAuthProviderClassName
允许你自定义认证实现机制。
Pulsar 自带了令牌身份认证的实现,并通过相同的方式实现了分发证书授权。 配置如下:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
使用 Token 身份认证时,函数 worker 会获取到 token,并将其用于部署或者更新函数。 Token 通过 secret 的方式挂载到 pod 里面。
自定义的认证方式或者TLS, 你必须去实现上面的接口或者使用替代机制实现身份认证。 如果你在集群中使用 Token 身份认证和 TLS 加密实现安全通信,Pulsar 会将证书授权(CA)传递给客户端,所以客户端就可以获取到集群认证所需的内容,同时信任集群所签发的证书。
Note
If you use tokens that expire when deploying functions, these tokens will expire.
启用身份认证
当您在启用了身份认证的集群中独立运行函数 worker 时, 你必须配置函数 worker 和 broker 交互时传入身份认证信息。 所以你必须配置 broker 所需的身份认证和鉴权选项。
例如,如果是使用 Token 认证,你必须在function-worker.yml
文件内配置如下属性。
clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
authenticationProviders:
- "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
authorizationEnabled: true
authenticationEnabled: true
superUserRoles:
- superuser
- proxy
properties:
tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded
Note
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.
定制 Kubernetes 运行时
Kubernetes 集成允许你实现类并自定义如何生成manifests. 你可以为配置文件functions-worker.yml
的配置项runtimeCustomizerClassName
指定一个全路径的类名。 这个类必须实现org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer
接口。
函数(数据来源/ 数据去向) API 会提供了一个标记,customRuntimeOptions
,传递到这个接口。
为了初始化 KubernetesManifestCustomizer
,可以将 runtimeCustomizerConfig
在 functions-worker.yml
文件中进行提供。 runtimeCustomizerConfig
作为参数传递给 public void initialize(Map<String, Object> config)
函数。 runtimeCustomizerConfig
和 customRuntimeOptions
的配置值是不同的, runtimeCustomizerConfig
在所有的函数中配置值都一样。 如果同时配置了 runtimeCustomizerConfig
和customRuntimeOptions
,需要决定如何管理这两个配置,在对KubernetesManifestCustomizer
的实现中。
Pulsar 自带了一些实现。 若要使用基本的功能,可以将runtimeCustomizerClassName
设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
。 使用 runtimeCustomizerConfig
初始化的内置实现使得能够将 JSON 类型文件作为 customRuntimeOptions
传递,其中包含要扩充的某些属性,这也决定了 manifests 的生成方式。 如果提供了 runtimeCustomizerConfig
和 customRuntimeOptions
,在两个配置发生冲突时,BasicKubernetesManifestCustomizer
会使用customRuntimeOptions
的参数来覆盖 runtimeCustomizerConfig 的配置。
如下是 customRuntimeOptions
的示例。
{
"jobName": "jobname", // 运行函数实例的 k8s pod 名字
"jobNamespace": "namespace", // 运行函数实例的 k8s 命名空间
"extractLabels": { // 附加给statefulSet,service 和 pods 的额外的标签
"extraLabel": "value"
},
"extraAnnotations": { // 附加给statefulSet,service 和 pods 的额外的注释
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // 添加在 pod 的配置里的节点选择器标签
"customLabel": "value"
},
"tolerations": [ // 添加到 pod 的特定情况下某些变量能容忍的值
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // cpu 和内存的值参照此处所述定义: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
使用跨域复制运行集群
如果你想使用跨域复制运行多个集群,每个集群必须使用不同的函数命名空间。 否则,函数会共享同一个命名空间,可能跨集群调度。
例如,假设你有两个集群:east-1
和west-1
, 你可能对这两个函数做如下的运行配置:
pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1
pulsarFunctionsCluster: west-1
pulsarFunctionsNamespace: public/functions-west-1
这确保了两个不同的函数 Worker 使用两个不同的主题进行内部调度。
配置函数worker 独立运行
当需要配置独立运行函数 worker 时,你必须在 broker 里面配置如下参数,尤其是你需要使用 TLS 时。 然后函数 Worker 才可以和 broker 进行通信。
你必须需配置一下必选参数。
workerPort: 8080
workerPortTls: 8443 # 使用TLS的时候需要配置
tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # 使用TLS的时候需要配置
tlsKeyFilePath: /etc/pulsar/tls/tls.key # 使用TLS的时候需要配置
tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # 使用TLS的时候需要配置
pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ 使用TLS的时候需要配置
pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ 使用TLS的时候需要配置
useTls: true # 使用TLS的时候需要配置