Configure Functions runtime
你可以使用下列方式去运行函数。
- Thread: Invoke functions threads in functions worker.
- Process: Invoke functions in processes forked by functions worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
Note
Pulsar supports adding labels to the Kubernetes StatefulSets and services while launching functions, which facilitates selecting the target Kubernetes objects.
线程和进程两种模式有如下不同:
- 线程模式:当函数运行在线程模式时,函数和 Puslsar functions 的 work 是运行在同一个Java 虚拟机(JVM) 里面的。
- 进程模式: 当函数运行在进程模式时,它们是运行在同一台机器上的不同 Java 虚拟机里面。
配置线程模式运行
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
threadGroupName: "Your Function Container Group"
Thread runtime is only supported in Java function.
配置进程模式运行
When you enable Process runtime, you do not need to configure anything.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
配置 Kubernetes 运行
当函数 worker 生成 Kubernetes manifests,应用这份 manifests 时,Kubernetes 就会开始工作。 如果你将函数运行在 Kubernetes 里面,你能够使用serviceAccount
去关联正在运行该函数的 Pod。 然后,可以将其配置为与 Kubernetes 集群进行通信。
Mainifests 由函数 worker 生成,包含一个StatefulSet
,一个Service
(用于 pods 之间通信),和一个Secret
(在需要的情况下,用于身份认证)。 默认情况下,StatefulSet
只有一个 Pod,函数的 “并行度” 决定它的数量。 Pode 启动时, Pod 会下载函数的运行内容 (通过函数 worker 的REST API 下载)。 Pod 的容器镜像是可以配置的,但是必须先有函数运行时。
Kubernetes 运行时是支持 secrets 的,所以你能够创建一个 Kubernetes secret,并将其作为环境变量在 Pod 内可见。 Kubernetes 运行时是可扩展的,你可以实现类并定制,比如:如何生成 Kubernetes manifests、如果 传递认证数据给pod,如何整合 secrets等。
基础配置
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory
in the functions_worker.yaml
file. The following is an example.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
# uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
k8Uri:
# the kubernetes namespace to run the function instances. 如果为空,则为默认的命名空间'default'
jobNamespace:
# 运行函数的 docker 镜像 默认情况下是`apachepulsar/pulsar`
pulsarDockerImageName:
# docker 镜像根据用户提供的不同配置运行函数实例。
# 默认是 `apachepulsar/pulsar`
# 例如:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# Pulsar 在镜像内的主目录的根目录。 by default it is `/pulsar`.
# if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
pulsarRootDir:
# this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
# setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# is running. setting this to false if your function worker is not running as a k8 pod.
submittingInsidePod: false
# setting the pulsar service url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar service url configured in worker service
pulsarServiceUrl:
# setting the pulsar admin url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar admin url configured in worker service
pulsarAdminUrl:
# the custom labels that function worker uses to select the nodes for pods
customLabels:
# the directory for dropping extra function dependencies
# if it is not an absolute path, it is relative to `pulsarRootDir`
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the function per on a per instance basis
percentMemoryPadding: 10
如果函数 worker 运行在 Kubernetes 环境的 broker 中,你可以使用默认配置。
Kubernetes 运行独立的函数 worker
如果你想在 Kubernetes 上运行独立的函数 worker (即不嵌入在broker上运行),你必须在函数 worker 上配置pulsarSerivceUrl
和pulsarAdminUrl
,来指定 broker 的地址。
例如,Pulsar brokers 和 函数 worker 都运行在 K8S的 pulsar
命名空间下。 Broker 有一个叫做 brokers
的服务,函数 worker 有一个叫做 func-worker
的服务。 他们的设置如下:
pulsarServiceUrl: pulsar://broker.pulsar:6650 // 如果使用了 TLS 就用 :pulsar+ssl://broker.pulsar:6651
pulsarAdminUrl: http://func-worker.pulsar:8080 // 如果使用了TLS 就用: https://func-worker:8443
Kubernetes 集群中运行 RBAC
如果要在集群中运行 RBAC,需要确保运行函数worker(如果函数运行在 broker 上,那就是 broker )的 service account 有以下 Kubernetes API 的访问权限。
- services
- configmaps
- pods
- apps.statefulsets
如下是完整的配置:
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: functions-worker
rules:
- apiGroups: [""]
resources:
- services
- configmaps
- pods
verbs:
- '*'
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: functions-worker
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: functions-worker
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: functions-worker
subjectsKubernetesSec:
- kind: ServiceAccount
name: functions-worker
如果 service account 的配置不正确,就会有如下的错误信息提示。
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]
集成 Kubernetes secrets
为了确保信息安全 ,Pulsar 函数可以引用Kubernetes secrets。 如果要启用此功能,请将参数secretsProviderConfiguratorClassName
设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
。
你可以在部署函数的 kubernetes 命名空间中创建一个 secret。 例如,你的函数部署在 Kubernetes 的命名空间 pulsar-func
中,你有一个名为database-creds
的secret,它有一个字段password
,你希望将其作为一个环境变量传递到 pod 中,并且想用名称DATABASE_PASSWORD
获取到它。 那么下面的函数配置允许你将这个 secret 以环境变量的形式传递到 pod 里面。
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"
secrets:
# secret 中的`password`和`database-creds`将被挂载到名为 `DATABASE_PASSWORD `的环境变量中。
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
启用 Token 认证
当你开启 Pulsar 集群的身份认证时,你需要为运行函数的 pod 提供一种机制,以通过 broker的身份认证。
org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
接口可以为任何认证机制提供支持。 配置文件function-worker.yml
的配置项functionAuthProviderClassName
允许你自定义认证实现机制。
Pulsar 自带了令牌身份认证的实现,并通过相同的方式实现了分发证书授权。 配置如下:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
使用 Token 身份认证时,函数 worker 会获取到 token,并将其用于部署或者更新函数。 Token 通过 secret 的方式挂载到 pod 里面。
自定义的认证方式或者TLS, 你必须去实现上面的接口或者使用替代机制实现身份认证。 如果你在集群中使用 Token 身份认证和 TLS 加密实现安全通信,Pulsar 会将证书授权(CA)传递给客户端,所以客户端就可以获取到集群认证所需的内容,同时信任集群所签发的证书。
Note
If you use tokens that expire when deploying functions, these tokens will expire.
启用身份认证
当您在启用了身份认证的集群中独立运行函数 worker 时, 你必须配置函数 worker 和 broker 交互时传入身份认证信息。 所以你必须配置 broker 所需的身份认证和鉴权选项。
例如,如果是使用 Token 认证,你必须在function-worker.yml
文件内配置如下属性。
clientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken
clientAuthenticationParameters: file:///etc/pulsar/token/admin-token.txt
configurationStoreServers: zookeeper-cluster:2181 # auth requires a connection to zookeeper
authenticationProviders:
- "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"
authorizationEnabled: true
authenticationEnabled: true
superUserRoles:
- superuser
- proxy
properties:
tokenSecretKey: file:///etc/pulsar/jwt/secret # 如果使用 secret 认证
tokenPublicKey: file:///etc/pulsar/jwt/public.key # 如果使用公钥/私钥
Note
You must configure both the Function Worker authorization or authentication for the server to authenticate requests and configure the client to be authenticated to communicate with the broker.
定制 Kubernetes 运行时
Kubernetes 集成允许你实现类并自定义如何生成manifests. 你可以为配置文件functions-worker.yml
的配置项runtimeCustomizerClassName
指定一个全路径的类名。 这个类必须实现org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer
接口。
函数(数据来源/ 数据去向) API 会提供了一个标记,customRuntimeOptions
,传递到这个接口。
To initialize the KubernetesManifestCustomizer
, you can provide runtimeCustomizerConfig
in the functions-worker.yml
file. runtimeCustomizerConfig
is passed to the public void initialize(Map<String, Object> config)
function of the interface. runtimeCustomizerConfig
is different from the customRuntimeOptions
as runtimeCustomizerConfig
is the same across all functions. If you provide both runtimeCustomizerConfig
and customRuntimeOptions
, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer
.
Pulsar 自带了一些实现。 若要使用基本的功能,可以将runtimeCustomizerClassName
设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
。 The built-in implementation initialized with runtimeCustomizerConfig
enables you to pass a JSON document as customRuntimeOptions
with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig
and customRuntimeOptions
are provided, BasicKubernetesManifestCustomizer
uses customRuntimeOptions
to override the configuration if there are conflicts in these two configurations.
Below is an example of customRuntimeOptions
.
{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
使用跨域复制运行集群
如果你想使用跨域复制运行多个集群,每个集群必须使用不同的函数命名空间。 否则,函数会共享同一个命名空间,可能跨集群调度。
例如,假设你有两个集群:east-1
和west-1
, 你可能对这两个函数做如下的运行配置:
pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1
pulsarFunctionsCluster: west-1
pulsarFunctionsNamespace: public/functions-west-1
这确保了两个不同的函数 Worker 使用两个不同的主题进行内部调度。
配置函数worker 独立运行
当需要配置独立运行函数 worker 时,你必须在 broker 里面配置如下参数,尤其是你需要使用 TLS 时。 然后函数 Worker 才可以和 broker 进行通信。
你必须需配置一下必选参数。
workerPort: 8080
workerPortTls: 8443 # 使用TLS的时候需要配置
tlsCertificateFilePath: /etc/pulsar/tls/tls.crt # 使用TLS的时候需要配置
tlsKeyFilePath: /etc/pulsar/tls/tls.key # 使用TLS的时候需要配置
tlsTrustCertsFilePath: /etc/pulsar/tls/ca.crt # 使用TLS的时候需要配置
pulsarServiceUrl: pulsar://broker.pulsar:6650/ # or pulsar+ssl://pulsar-prod-broker.pulsar:6651/ 使用TLS的时候需要配置
pulsarWebServiceUrl: http://broker.pulsar:8080/ # or https://pulsar-prod-broker.pulsar:8443/ 使用TLS的时候需要配置
useTls: true # 使用TLS的时候需要配置