Pulsar Functions 是轻量级计算机处理程序,具有如下特点:
- 从一个或多个 Pulsar topic 中消费消息;
- 将用户提供的处理逻辑应用于每条消息;
- 将计算结果发布到另一个主题
这是一个基于java的例子(使用原生接口)
import java.util.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) { return String.format("%s!", input); }
}
这是一个基于Python实现的类似的函数(也使用了原生接口)
def process(input):
return "{0}!".format(input)
一条消息每次被发布到输入的主题时,函数都会被执行。 例如,如果一个函数监听在主题 tweet-stream
上,在每次有消息发布到这个主题的时候函数都会运行。
Goals
Pulsar Functions背后的核心目标是使您能够轻松创建各种级别的复杂的的处理逻辑,而无需部署单独的类似系统(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions本质上是现成的计算基础设施,作为Pulsar消息系统的一部分,供你使用。 这个核心目标与一系列其他目标相关联:
- Developer productivity (language-native vs. Pulsar Functions SDK functions)
- 简单的故障排查
- 操作简单(不需要外部处理系统)
Inspirations
- Pulsar Functions功能受到若干系统和模式的启发(并从中汲取线索):
- 流处理引擎,例如 Apache Storm、Apache Heron、Apache Flink
- “无服务器(Serverless)”和“Function as a Service”(FaaS)云平台,如:Amazon Web Services Lambda、Google Cloud Functions、Azure Cloud Functions 等
Pulsar Functions能被这样描述
- Lambda 样式的 functions
- 专门设计的使用Pulsar来作为消息总线
Programming model
Pulsar Functions背后的核心编程模型非常简单:
- Functions receive messages from one or more input topics. Every time a message is received, the function can do a variety of things:
- 将某些处理逻辑应用到输入并写入到输出:
- 在Pulsar中的一个输出主题
- Apache BookKeeper
- 写入日志到 日志主题 (可能用于调试目的)
- 增量 计数器
- 将某些处理逻辑应用到输入并写入到输出:
词数统计示例
如果你使用Pulsar Functions 执行经典的字词计数示例,那么它可能看起来像这样:
If you were writing the function in Java using the Pulsar Functions SDK for Java, you could write the function like below:
package org.example.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
// This function is invoked every time a message is published to the input topic
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return null;
}
}
Next, you need to bundle and build the jar file to be deployed, the approaches can be found in “Creating an Uber JAR” and “Creating a NAR package”. Then deploy it in your Pulsar cluster using the command line like below:
$ bin/pulsar-admin functions create \
--jar target/my-jar-with-dependencies.jar \
--classname org.example.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count \
--inputs persistent://public/default/sentences \
--output persistent://public/default/count
Content-based routing example
Pulsar Functions的使用案例有很多,下面展示一个更复杂的例子,该示例涉及基于内容的路由。
想象一个函数, 它将内容 (字符串) 作为输入, 并根据内容将它们发布到不同的主题中(例如fruits或者vegetables主题) 如果这个内容既不属于fruit也不属于vegetable主题,就会有一个警告被记录到日志主题中. 下面是一个可视化表示形式:
如果使用Python语言实现此路由功能, 它可能如下所示:
from pulsar import Function
class RoutingFunction(Function):
def __init__(self):
self.fruits_topic = "persistent://public/default/fruits"
self.vegetables_topic = "persistent://public/default/vegetables"
def is_fruit(item):
return item in ["apple", "orange", "pear", "other fruits..."]
def is_vegetable(item):
return item in ["carrot", "lettuce", "radish", "other vegetables..."]
def process(self, item, context):
if self.is_fruit(item):
context.publish(self.fruits_topic, item)
elif self.is_vegetable(item):
context.publish(self.vegetables_topic, item)
else:
warning = "The item {0} is neither a fruit nor a vegetable".format(item)
context.get_logger().warn(warning)
Command-line interface
Pulsar Functions使用 pulsar-admin
工具进行管理 (主要是functions
命令). 这里是一个示例命令,它将运行一个函数在localrun模式下:
$ bin/pulsar-admin functions localrun \
--inputs persistent://public/default/test_src \
--output persistent://public/default/test_result \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction
Fully Qualified Function Name (FQFN)
Each Pulsar Function has a Fully Qualified Function Name (FQFN) that consists of three elements: the function’s tenant, namespace, and function name. FQFN’s look like this:
tenant/namespace/name
例如, FQFN使您能够创建具有相同名称的多个函数, 前提是它们位于不同的命名空间中。
Configuration
可以通过两种方式配置Pulsar Functions:
- 基于 命令行参数通过 pulsar-admin functions 接口
- 基于yaml 配置文件
If you’re supplying a YAML configuration, you must specify a path to the file on the command line. Here’s an example:
$ bin/pulsar-admin functions create \
--function-config-file ./my-function.yaml
下面是一个my-function.yaml
文件的示例:
name: my-function
tenant: public
namespace: default
jar: ./target/my-functions.jar
className: org.example.pulsar.functions.MyFunction
inputs:
- persistent://public/default/test_src
output: persistent://public/default/test_result
您还可以通过命令行工具指定一些函数属性,通过 yaml 配置一些其他的函数属性, 从而同时使用这两种配置方法。
Supported languages
Pulsar Functions当前支持Java and Python两种语言. 对其他语言的支持即将推出。
The Pulsar Functions API
使用Pulsar Functions api 您可以创建以下处理逻辑:
- Type safe. Pulsar Functions can process raw bytes or more complex, application-specific types.
- Based on SerDe (Serialization/Deserialization). A variety of types are supported “out of the box” but you can also create your own custom SerDe logic.
Function context
使用 Pulsar Functions SDK创建的每个 Pulsar 函数 都支持获取一个上下文对象:
- 有关该函数的各种信息,包括:
- 函数的名称
- 函数的租户和命名空间
- 用户的配置
- 特殊功能,包括:
Language-native functions
支持Java和Python的原生函数,即没有依赖的Pulsar Functions
原生函数的好处是除了Java/Python中现有的“开箱即用”的变量之外,它们无需其他依赖。缺点是它们不提供对函数 上下文的访问,这对于各种功能都是必要的,包括 logging user configuration等等。
The Pulsar Functions SDK
如果你想通过Pulsar Function来获取 上下文对象, 你可以使用 Pulsar Functions SDK, 支持 Java 和 Python两种语言。
Java
下面是一个使用有关其上下文的信息的 java 函数示例:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public class ContextAwareFunction implements Function<String, Void> {
@Override
public Void process(String input, Context, context) {
Logger LOG = context.getLogger();
String functionTenant = context.getTenant();
String functionNamespace = context.getNamespace();
String functionName = context.getName();
LOG.info("Function tenant/namespace/name: {}/{}/{}", functionTenant, functionNamespace, functionName);
return null;
}
}
Python
下面是一个使用有关其上下文的信息的 java 函数示例:
from pulsar import Function
class ContextAwareFunction(Function):
def process(self, input, context):
log = context.get_logger()
function_tenant = context.get_function_tenant()
function_namespace = context.get_function_namespace()
function_name = context.get_function_name()
log.info("Function tenant/namespace/name: {0}/{1}/{2}".format(function_tenant, function_namespace, function_name))
部署
The Pulsar Functions feature was built to support a variety of deployment options. At the moment, there are two ways to run Pulsar Functions:
部署模式 | Description |
---|---|
Local run mode | The function runs in your local environment, for example on your laptop |
Cluster mode | 函数在运行在你的Pulsar 集群上,与你的Pulsar brokers在相同的机器上 |
Local run mode
如果你使用 localrun 模式运行一Pulsar函数,它将在运行命令的机器上运行(这可能是你的笔记本电脑,AWS EC2 实例等)。 这里是一个localrun
模式下运行的命令:
$ bin/pulsar-admin functions localrun \
--py myfunc.py \
--classname myfunc.SomeFunction \
--inputs persistent://public/default/input-1 \
--output persistent://public/default/output-1
默认情况下,函数连接到同一机器上运行的 Pulsar 集群通过本地的broker服务,URL是 pulsar://localhost:6650
。 如果您想使用localrun模式来运行一个函数,但连接到非本地Pulsar 集群,你可以通过 --brokerServiceUrl
标志来指定一个不同的broker URL。 Here’s an example:
$ bin/pulsar-admin functions localrun \
--broker-service-url pulsar://my-cluster-host:6650 \
# Other function parameters
Cluster run mode
当你运行 Pulsar Function在集群模式下时, 函数代码将被上传到Pulsar broker上,并与代理broker一起运行,而不是在您的本地环境中运行。 您可以使用 create
命令在群集模式下运行函数。 Here’s an example:
$ bin/pulsar-admin functions create \
--py myfunc.py \
--classname myfunc.SomeFunction \
--inputs persistent://public/default/input-1 \
--output persistent://public/default/output-1
此命令将上传 myfunc.py
到 Pulsar,它将使用代码启动一个 或更多 函数实例。
Parallelism
默认情况下,当您在集群模式创建并运行Pulsar函数时,只有一个实例运行。 但是, 您也可以并行运行多个实例。 您可以在创建函数时指定实例数, 也可以使用新的并行性因子更新现有的单实例函数。
例如, 此命令将创建并运行并行性为 5 (即5个实例) 的函数:
$ bin/pulsar-admin functions create \
--name parallel-fun \
--tenant public \
--namespace default \
--py func.py \
--classname func.ParallelFunction \
--parallelism 5
Function instance resources
在 群集模式运行 pusar Functions 时, 可以指定资源分配给每个函数 实例:
Resource | Specified as… | Runtimes |
---|---|---|
CPU | The number of cores | Docker (coming soon) |
RAM | The number of bytes | Process, Docker |
Disk space | The number of bytes | Docker |
下面是一个函数创建命令的示例, 它将分配8核、8GB内存和10GB的磁盘空间给一个函数:
$ bin/pulsar-admin functions create \
--jar target/my-functions.jar \
--classname org.example.functions.MyFunction \
--cpu 8 \
--ram 8589934592 \
--disk 10737418240
有关资源的详细信息, 请参阅 Deploying and Managing Pulsar Functions 文档。
Logging
使用 Pulsar Functions SDK创建的Pulsar Functions可以将日志发送到一个日志的主题,这个可以在你的函数配置里指定。 例如,使用下面的命令创建的函数将在 persistent://public/default/my-func-1-log
主题上生成所有日志:
$ bin/pulsar-admin functions create \
--name my-func-1 \
--log-topic persistent://public/default/my-func-1-log \
# Other configs
这个Java function的例子记录日志使用了不同的日志级别基于函数的输入:
public class LoggerFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Logger LOG = context.getLogger();
if (input.length() <= 100) {
LOG.info("This string has a length of {}", input);
} else {
LOG.warn("This string is getting too long! It has {} characters", input);
}
}
}
User configuration
Pulsar Functions可以通过命令行传递任意的key-values(key和values必须是字符串)。 这些key-values的设置通过调用函数的用户配置. 用户配置必须包括 JSON 字符串.
这是一个函数使用用户配置的额示例
$ bin/pulsar-admin functions create \
--user-config '{"key-1":"value-1","key-2","value-2"}' \
# Other configs
这是一个在函数中获取配置信息的示例
public class ConfigMapFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
String val1 = context.getUserConfigValue("key1").get();
String val2 = context.getUserConfigValue("key2").get();
context.getLogger().info("The user-supplied values are {} and {}", val1, val2);
return null;
}
}
Triggering Pulsar Functions
Pulsar Functions 运行在集群模式下可以使用触发器参考command line 通过触发器,您可以轻松地将特定值传递给函数,并获得函数的返回值,而无需担心创建客户端、向正确的输入主题发送消息等。 触发器非常有用绝不限于测试和调试目的。
Triggering a function is ultimately no different from invoking a function by producing a message on one of the function’s input topics.
pulsar-admin functions trigger
本质上是一种非常方便的机制用于向函数发送消息,而不需要使用pulsar-client
工具或指定语言的客户端库。
让我我们看一个基于Python的函数示例(原生接口),它简单地反转字符串输入:
def process(input):
return input[::-1]
如果该函数在Pulsar集群中运行, 则可以像这样被触发:
$ bin/pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name reverse-func \
--trigger-value "snoitcnuf raslup ot emoclew"
这应该在控制台返回 welcome to pulsar functions
。
您也可以使用
--triggerFile
标志,用文件的内容触发Pulsar函数,而不是通过命令行工具传递字符串。
Processing guarantees
Pulsar 函数功能提供三种不同的信息语义,你可以应用于任何函数中:
Delivery semantics | Description |
---|---|
At-most-once delivery | 发送给函数的每个消息最多会被处理一次 |
At-least-once delivery | 发送给函数的每条消息至少被处理一次 |
Effectively-once delivery | 发送函数的每条消息没精确的处理一次 |
例如, 此命令将在 群集模式 中运行一个函数, 并effectively-once保证有效性:
$ bin/pulsar-admin functions create \
--name my-effectively-once-function \
--processing-guarantees EFFECTIVELY_ONCE \
# Other function configs
Metrics
Pulsar Functions使用Pulsar Functions SDK可以发布meitrics到Pulsar 关于更多信息,请参阅 Metrics for Pulsar Functions.
State storage
Pulsar Functions使用 Apache Bookerper 存储状态。 所有Pulsar安装,包括local standalone安装,BookKeeper bookies的部署等都将状态存储到[Apache BookKeeper](https://bookkeeper. apache. org)。