Configure Functions runtime

可以使用下列方式去运行函数。

  • Thread: Invoke functions threads in functions worker.
  • Process: Invoke functions in processes forked by functions worker.
  • Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.

Note
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.

线程和进程两种模式有如下不同:

  • 线程模式:当函数运行在线程模式时,函数和 Puslsar functions 的 work 是运行在同一个Java 虚拟机(JVM) 里面的。
  • 进程模式: 当函数运行在进程模式时,它们是运行在同一台机器上的不同 Java 虚拟机里面。

配置线程模式运行

It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. threadGroupName: "Your Function Container Group"

Thread runtime is only supported in Java function.

配置进程模式运行

When you enable Process runtime, you do not need to configure anything.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # 存储函数日志的目录
  4. logDirectory:
  5. # 如果想把 java 实例的 jar 包放在其他位置执行,可以在这配置 jar 包存放位置
  6. javaInstanceJarLocation:
  7. # 仅将 python 实例 jar 放在不同位置时才更改 python 实例位置
  8. pythonInstanceLocation:
  9. # 更改额外的依赖项位置:
  10. extraFunctionDependenciesDir:

Process runtime is supported in Java, Python, and Go functions.

配置 Kubernetes 运行

当函数 worker 生成 Kubernetes manifests,应用这份 manifests 时,Kubernetes 就会开始工作。 如果你将函数运行在 Kubernetes 里面,你能够使用serviceAccount 去关联正在运行该函数的 Pod。 然后,可以将其配置为与 Kubernetes 集群进行通信。

Mainifests 由函数 worker 生成,包含一个StatefulSet,一个Service(用于 pods 之间通信),和一个Secret(在需要的情况下,用于身份认证)。 默认情况下,StatefulSet只有一个 Pod,函数的 “并行度” 决定它的数量。 Pode 启动时, Pod 会下载函数的运行内容 (通过函数 worker 的REST API 下载)。 Pod 的容器镜像是可以配置的,但是必须先有函数运行时。

Kubernetes 运行时是支持 secrets 的,所以你能够创建一个 Kubernetes secret,并将其作为环境变量在 Pod 内可见。 Kubernetes 运行时是可扩展的,你可以实现类并定制,比如:如何生成 Kubernetes manifests、如果 传递认证数据给pod,如何整合 secrets等。

提示

For the rules of translating Pulsar object names into Kubernetes resource labels, see here.

基础配置

It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory in the functions_worker.yaml file. The following is an example.

  1. functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
  2. functionRuntimeFactoryConfigs:
  3. # 访问 kubernetes 集群的 uri,为空的话将使用函数 worker 中的 kubernetes 相关配置。
  4. k8Uri:
  5. # 运行函数实例的 kubernetes 命名空间。 不设置的话,默认值是 `default`
  6. jobNamespace:
  7. # 运行函数实例的 Kubernetes pod 名。 不设置的话,默认值是
  8. # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>`
  9. jobName:
  10. # 运行函数实例的 docker 镜像。 默认情况下是`apachepulsar/pulsar`
  11. pulsarDockerImageName:
  12. # docker 镜像根据用户提供的不同配置运行函数实例。
  13. # 默认是 `apachepulsar/pulsar`
  14. # 例如:
  15. # functionDockerImages:
  16. # JAVA: JAVA_IMAGE_NAME
  17. # PYTHON: PYTHON_IMAGE_NAME
  18. # GO: GO_IMAGE_NAME
  19. functionDockerImages:
  20. # 运行函数实例的镜像拉取策略。 默认是 `IfNotPresent`
  21. imagePullPolicy: IfNotPresent
  22. # 在 `pulsarDockerImageName` 中的 pulsar 主目录的根目录 by default it is `/pulsar`.
  23. # 如果使用自主构建的镜像,需要定义 `pulsarDockerImageName`
  24. pulsarRootDir:
  25. # config admin CLI 允许用户自定义 admin cli 工具的配置,例如:
  26. # `/bin/pulsar-admin and /bin/pulsarctl`. 默认是 `/bin/pulsar-admin`。 如果想使用 `pulsarctl`
  27. # 需要相应的进行如下配置:
  28. configAdminCLI:
  29. # 如果 `k8Uri` 设置为 null 时,才会生效。 if your function worker is running as a k8 pod,
  30. # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
  31. # is running. setting this to false if your function worker is not running as a k8 pod.
  32. submittingInsidePod: false
  33. # 设置提供 pulsar 服务的 url,让 pulsar 函数能够连接到 pulsar
  34. # 如果没设置,将使用在 worker 里配置的提供 pulsar 服务的 url
  35. pulsarServiceUrl:
  36. # 设置提供 pulsar admin 服务的 url,让 pulsar 函数能够连接到 pulsar
  37. # 如果没设置,将使用在 worker 里配置的提供 pulsar admin 服务的 url
  38. pulsarAdminUrl:
  39. # 用于标记是否安装用户代码依赖。 (应用于 python 包)
  40. installUserCodeDependencies:
  41. # pulsar 函数用来下载 python 依赖包的存储库
  42. pythonDependencyRepository:
  43. # pulsar 函数用来下载 python 额外的依赖包的存储库
  44. pythonExtraDependencyRepository:
  45. # 函数 worker 用来为 pod 选择节点的自定义标签
  46. customLabels:
  47. # 预期的指标收集间隔,以秒为单位
  48. expectedMetricsCollectionInterval: 30
  49. # Kubernetes Runtime 将定期检查此 configMap(如果已定义),并且如果 kubernetes 中特定内容有任何更改,将应用这些更改
  50. changeConfigMap:
  51. # 用于存储改动的 configMap 的命名空间
  52. changeConfigMapNamespace:
  53. # 为 function/source/sink 设置 cpu 使用率和 cpu 限制
  54. # cpu 请求的公式是 cpuRequest = userRequestCpu / cpuOverCommitRatio
  55. cpuOverCommitRatio: 1.0
  56. # 为 function/source/sink 设置的内存请求率和内存大小限制
  57. # 内存请求的公式是 memoryRequest = userRequestMemory / memoryOverCommitRatio
  58. memoryOverCommitRatio: 1.0
  59. # 函数 pod 内部的端口,worker 使用它与 pod 进行通信
  60. grpcPort: 9093
  61. # 函数 pod 内的端口,在其上暴露 prometheus 指标
  62. metricsPort: 9094
  63. # 函数 pod 中的 nar 包将被提取的目录
  64. narExtractionDirectory:
  65. # 存储函数实例文件的 classpath
  66. functionInstanceClassPath:
  67. # 用于删除额外的函数依赖项的目录
  68. # 如果不是绝对路径,那就是相对于 `pulsarRootDir` 的路径
  69. extraFunctionDependenciesDir:
  70. # 在每个实例的函数请求的内存之上,添加了额外的内存补充
  71. percentMemoryPadding: 10

如果函数 worker 运行在 Kubernetes 环境的 broker 中,你可以使用默认配置。

Kubernetes 运行独立的函数 worker

如果你想在 Kubernetes 上运行独立的函数 worker (即不嵌入在broker上运行),你必须在函数 worker 上配置pulsarSerivceUrlpulsarAdminUrl,来指定 broker 的地址。

例如,Pulsar brokers 和 函数 worker 都运行在 K8S的 pulsar命名空间下。 Broker 有一个叫做 brokers 的服务,函数 worker 有一个叫做 func-worker 的服务。 他们的设置如下:

  1. pulsarServiceUrl: pulsar://broker.pulsar:6650 // 如果使用了 TLS 就用 :pulsar+ssl://broker.pulsar:6651
  2. pulsarAdminUrl: http://func-worker.pulsar:8080 // 如果使用了TLS 就用: https://func-worker:8443

Kubernetes 集群中运行 RBAC

如果要在集群中运行 RBAC,需要确保运行函数worker(如果函数运行在 broker 上,那就是 broker )的 service account 有以下 Kubernetes API 的访问权限。

  • services
  • configmaps
  • pods
  • apps.statefulsets

如下是完整的配置:

  1. apiVersion: rbac.authorization.k8s.io/v1beta1
  2. kind: ClusterRole
  3. metadata:
  4. name: functions-worker
  5. rules:
  6. - apiGroups: [""]
  7. resources:
  8. - services
  9. - configmaps
  10. - pods
  11. verbs:
  12. - '*'
  13. - apiGroups:
  14. - apps
  15. resources:
  16. - statefulsets
  17. verbs:
  18. - '*'
  19. ---
  20. apiVersion: v1
  21. kind: ServiceAccount
  22. metadata:
  23. name: functions-worker
  24. ---
  25. apiVersion: rbac.authorization.k8s.io/v1beta1
  26. kind: ClusterRoleBinding
  27. metadata:
  28. name: functions-worker
  29. roleRef:
  30. apiGroup: rbac.authorization.k8s.io
  31. kind: ClusterRole
  32. name: functions-worker
  33. subjectsKubernetesSec:
  34. - kind: ServiceAccount
  35. name: functions-worker

如果 service account 的配置不正确,就会有如下的错误信息提示。

  1. 22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
  2. io.kubernetes.client.ApiException: Forbidden
  3. at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
  4. at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
  5. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  6. at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
  7. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  8. at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
  9. at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
  10. at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]

集成 Kubernetes secrets

为了确保信息安全 ,Pulsar 函数可以引用Kubernetes secrets。 如果要启用此功能,请将参数secretsProviderConfiguratorClassName设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator

你可以在部署函数的 kubernetes 命名空间中创建一个 secret。 例如,你的函数部署在 Kubernetes 的命名空间 pulsar-func中,你有一个名为database-creds的secret,它有一个字段password,你希望将其作为一个环境变量传递到 pod 中,并且想用名称DATABASE_PASSWORD获取到它。 那么下面的函数配置允许你将这个 secret 以环境变量的形式传递到 pod 里面。

  1. tenant: "mytenant"
  2. namespace: "mynamespace"
  3. name: "myfunction"
  4. topicName: "persistent://mytenant/mynamespace/myfuncinput"
  5. className: "com.company.pulsar.myfunction"
  6. secrets:
  7. # secret 中的`password`和`database-creds`将被挂载到名为 `DATABASE_PASSWORD `的环境变量中。
  8. DATABASE_PASSWORD:
  9. path: "database-creds"
  10. key: "password"

启用 Token 认证

当你开启 Pulsar 集群的身份认证时,你需要为运行函数的 pod 提供一种机制,以通过 broker的身份认证。

org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider 接口可以为任何认证机制提供支持。 配置文件function-worker.yml的配置项functionAuthProviderClassName 允许你自定义认证实现机制。

Pulsar 自带了令牌身份认证的实现,并通过相同的方式实现了分发证书授权。 配置如下:

  1. functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider

使用 Token 身份认证时,函数 worker 会获取到 token,并将其用于部署或者更新函数。 Token 通过 secret 的方式挂载到 pod 里面。

自定义的认证方式或者TLS, 你必须去实现上面的接口或者使用替代机制实现身份认证。 如果你在集群中使用 Token 身份认证和 TLS 加密实现安全通信,Pulsar 会将证书授权(CA)传递给客户端,所以客户端就可以获取到集群认证所需的内容,同时信任集群所签发的证书。

Note
If you use tokens that expire when deploying functions, these tokens will expire.

启用身份认证

当您在启用了身份认证的集群中独立运行函数 worker 时, 你必须配置函数 worker 和 broker 交互时传入身份认证信息。 所以你必须配置 broker 所需的身份认证和鉴权选项。

例如,如果是使用 Token 认证,你必须在function-worker.yml文件内配置如下属性。

  1. clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
  2. clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
  3. configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
  4. authenticationProviders:
  5. - "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
  6. authorizationEnabled: true
  7. authenticationEnabled: true
  8. superUserRoles:
  9. - superuser
  10. - proxy
  11. properties:
  12. tokenSecretKey: file:///etc/pulsar/jwt/secret # if using a secret token, key file must be DER-encoded
  13. tokenPublicKey: file:///etc/pulsar/jwt/public.key # if using public/private key tokens, key file must be DER-encoded

Note
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.

定制 Kubernetes 运行时

Kubernetes 集成允许你实现类并自定义如何生成manifests. 你可以为配置文件functions-worker.yml的配置项runtimeCustomizerClassName指定一个全路径的类名。 这个类必须实现org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer接口。

函数(数据来源/ 数据去向) API 会提供了一个标记,customRuntimeOptions,传递到这个接口。

为了初始化 KubernetesManifestCustomizer,可以将 runtimeCustomizerConfigfunctions-worker.yml 文件中进行提供。 runtimeCustomizerConfig 作为参数传递给 public void initialize(Map<String, Object> config) 函数。 runtimeCustomizerConfigcustomRuntimeOptions 的配置值是不同的, runtimeCustomizerConfig 在所有的函数中配置值都一样。 如果同时配置了 runtimeCustomizerConfigcustomRuntimeOptions,需要决定如何管理这两个配置,在对KubernetesManifestCustomizer 的实现中。

Pulsar 自带了一些实现。 若要使用基本的功能,可以将runtimeCustomizerClassName设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer。 使用 runtimeCustomizerConfig 初始化的内置实现使得能够将 JSON 类型文件作为 customRuntimeOptions 传递,其中包含要扩充的某些属性,这也决定了 manifests 的生成方式。 如果提供了 runtimeCustomizerConfigcustomRuntimeOptions ,在两个配置发生冲突时,BasicKubernetesManifestCustomizer 会使用customRuntimeOptions 的参数来覆盖 runtimeCustomizerConfig 的配置。

如下是 customRuntimeOptions 的示例。

  1. {
  2. "jobName": "jobname", // 运行函数实例的 k8s pod 名字
  3. "jobNamespace": "namespace", // 运行函数实例的 k8s 命名空间
  4. "extractLabels": { // 附加给statefulSet,service 和 pods 的额外的标签
  5. "extraLabel": "value"
  6. },
  7. "extraAnnotations": { // 附加给statefulSet,service 和 pods 的额外的注释
  8. "extraAnnotation": "value"
  9. },
  10. "nodeSelectorLabels": { // 添加在 pod 的配置里的节点选择器标签
  11. "customLabel": "value"
  12. },
  13. "tolerations": [ // 添加到 pod 的特定情况下某些变量能容忍的值
  14. {
  15. "key": "custom-key",
  16. "value": "value",
  17. "effect": "NoSchedule"
  18. }
  19. ],
  20. "resourceRequirements": { // cpu 和内存的值参照此处所述定义: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
  21. "requests": {
  22. "cpu": 1,
  23. "memory": "4G"
  24. },
  25. "limits": {
  26. "cpu": 2,
  27. "memory": "8G"
  28. }
  29. }
  30. }

使用跨域复制运行集群

如果你想使用跨域复制运行多个集群,每个集群必须使用不同的函数命名空间。 否则,函数会共享同一个命名空间,可能跨集群调度。

例如,假设你有两个集群:east-1west-1, 你可能对这两个函数做如下的运行配置:

  1. pulsarFunctionsCluster: east-1
  2. pulsarFunctionsNamespace: public/functions-east-1
  1. pulsarFunctionsCluster: west-1
  2. pulsarFunctionsNamespace: public/functions-west-1

这确保了两个不同的函数 Worker 使用两个不同的主题进行内部调度。

配置函数worker 独立运行

当需要配置独立运行函数 worker 时,你必须在 broker 里面配置如下参数,尤其是你需要使用 TLS 时。 然后函数 Worker 才可以和 broker 进行通信。

你必须需配置一下必选参数。

  1. workerPort: 8080
  2. workerPortTls: 8443 # 使用TLS的时候需要配置
  3. tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # 使用TLS的时候需要配置
  4. tlsKeyFilePath: /etc/pulsar/tls/tls.key # 使用TLS的时候需要配置
  5. tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # 使用TLS的时候需要配置
  6. pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ 使用TLS的时候需要配置
  7. pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ 使用TLS的时候需要配置
  8. useTls: true # 使用TLS的时候需要配置