本教程解释了如何开发 Pulsar connector,以在 Pulsar 和其他系统之间移动数据。
Pulsar connector 是特殊的 Pulsar Functions,因此创建 Pulsar connector 类似于创建 Pulsar function。
Pulsar connector 可分为两类:
类型 | Description | 示例 |
---|---|---|
{@inject: github:
Source
:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|RabbitMQ source connector imports the messages of a RabbitMQ queue to a Pulsar topic. Sink
| 从 Pulsar 导出数据到另一个系统。|Kinesis sink connector 从 Pulsar topic 导出消息到 Kinesis 流。
Develop
你可以开发 Pulsar source connector 和 sink connector。
Source
Developing a source connector is to implement the Source
interface, which means you need to implement the open
method and the read
method.
Implement the
open
method./**
* Open connector with configuration
*
* @param config initialization config
* @param sourceContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;
Source connector 初始化会调用此方法。
在此方法中,可通过传入的
config
参数检索所有 connector 的特定设置,并初始化所有必须的资源。例如,Kafka connector 可以在
open
方法中创建 Kafka 客户端。Besides, Pulsar runtime also provides a
SourceContext
for the connector to access runtime resources for tasks like collecting metrics. 执行此方法可以保存SourceContext
供后续使用。Implement the
read
method./**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* @return next message from source. The return result should never be null
* @throws Exception
*/
Record<T> read() throws Exception;
如果没有要返回的内容,则应停止该方法的执行而不是返回
null
。The returned
Record
should encapsulate the following information, which is needed by Pulsar IO runtime.Record
should provide the following variables:变量 Required Description
`TopicName`|No|Pulsar topic name from which the record is originated from. `Key`|No| 随机用密钥标记消息。<br/><br/> 更多详细信息,参阅 [Routing modes](/docs/zh-CN/concepts-messaging#routing-modes).| `Value`|Yes| 实际记录数据。 `EventTime`|No| Source 中记录的事件时间。 `PartitionId`|No| 如果记录来自已分区的 source,则返回 `PartitionId`。 <br/><br/>`PartitionId` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `RecordSequence`|No| 如果记录来自于序列 source,则返回 `RecordSequence`。<br/><br/>`RecordSequence` 被 Pulsar IO 运行时间用作唯一标识符的一部分,删除重复的消息并实现 exactly-once 处理保证。 `Properties` |No| 如果记录带有用户自定义的属性,则返回相关属性。 `DestinationTopic`|No| 应该写入消息的 topic。 `Message`|No| 包含用户发送数据的类。<br/><br/> 更多详细信息,参阅 [Message.java](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java)。|
* {@inject: github:`Record`:/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java} should provide the following methods:
Method|Description |---|--- `ack` |Acknowledge that the record is fully processed. `fail`|Indicate that the record fails to be processed.
Tip
For more information about how to create a source connector, see
KafkaSource
.
Sink
Developing a sink connector is similar to developing a source connector, that is, you need to implement the Sink
interface, which means implementing the open
method and the write
method.
Implement the
open
method./**
* Open connector with configuration
*
* @param config initialization config
* @param sinkContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
Implement the
write
method./**
* Write a message to Sink
* @param inputRecordContext Context of input record from the source
* @param record record to write to sink
* @throws Exception
*/
void write(Record<T> record) throws Exception;
During the implementation, you can decide how to write the
Value
and theKey
to the actual source, and leverage all the provided information such asPartitionId
andRecordSequence
to achieve different processing guarantees.还需要 ack 记录(消息发送成功)或发送失败记录(消息发送失败)。
测试
测试 connector 会有一定的难度,因为 Pulsar IO connector 与两个可能很难模拟的系统交互,即 Pulsar 和与 connector 连接的系统。
建议在模拟外部服务时编写特殊测试来测试 connector 功能,如下所示。
单元测试
可以为 connector 创建单元测试。
集成测试
只要有足够的单元测试,就可以添加单独的集成测试来验证端到端的功能。
Pulsar uses testcontainers for all integration tests.
Tip
For more information about how to create integration tests for Pulsar connectors, see
IntegrationTests
.
包
在开发并测试 connector 后,需要先将其打包,然后才能提交到 Pulsar Functions 集群。
有两种方法可以使用 Pulsar Functions 的运行时间,即 NAR 和 uber JAR。
Note
如果打算将 connector 打包并分发给其他人使用,则需要 为自己的代码添加许可,并进行版权保护。 请记得添加许可和版权到 代码用到的所有库和你的发行版中。
如果使用 NAR 方法,则 NAR 插件会在生成的 NAR 包中自动创建一个
DEPENDENCIES
文件,包含 connector 所有库的许可和版权。
NAR
NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java ClassLoader isolation.
Tip
For more information about how NAR works, see here.
Pulsar uses the same mechanism for packaging all built-in connectors.
安装 Pulsar connector 最简单的方法是使用 nifi-nar-maven-plugin 创建一个 NAR 包。
Include this nifi-nar-maven-plugin in your maven project for your connector as below.
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
</plugin>
</plugins>
You must also create a resources/META-INF/services/pulsar-io.yaml
file with the following contents:
name: connector name
description: connector description
sourceClass: fully qualified class name (only if source connector)
sinkClass: fully qualified class name (only if sink connector)
If you are using the Gradle NiFi plugin you might need to create a directive to ensure your pulsar-io.yaml is copied into the NAR file correctly.
Tip
For more information about an how to use NAR for Pulsar connectors, see
TwitterFirehose
.
Uber JAR
An alternative approach is to create an uber JAR that contains all of the connector’s JAR files and other resource files. No directory internal structure is necessary.
可以使用 maven-shade-plugin 来创建 uber JAR。如下所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>