Develop Connectors
This guide describes how developers can write new connectors for Pulsar IO to move databetween Pulsar and other systems. It describes how to create a Pulsar IO connector.
Pulsar IO connectors are specialized Pulsar Functions. So writinga Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors comein two flavors: Source
,which import data from another system, and Sink
,which export data to another system. For example, KinesisSink would exportthe messages of a Pulsar topic to a Kinesis stream, and RabbitmqSource would importthe messages of a RabbitMQ queue to a Pulsar topic.
Developing
Develop a source connector
What you need to develop a source connector is to implement Source
interface.
First, you need to implement the open
method. This method will be called once when the source connectoris initialized. In this method, you can retrieve all the connector specific settings throughthe passed config
parameter, and initialize all the necessary resourcess. For example, a Kafkaconnector can create the Kafka client in this open
method.
Beside the passed-in config
object, the Pulsar runtime also provides a SourceContext
for theconnector to access runtime resources for tasks like collecting metrics. The implementation cansave the SourceContext
for futher usage.
/**
* Open connector with configuration
*
* @param config initialization config
* @param sourceContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;
The main task for a Source implementor is to implement read
method.
/**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* @return next message from source. The return result should never be null
* @throws Exception
*/
Record<T> read() throws Exception;
The implementation should be blocking on this method if nothing to return. It should never returnnull
. The returned Record
should encapsulates the information that is needed byPulsar IO runtime.
These information includes:
- Topic Name: Optional. If the record is originated from a Pulsar topic, it should be the Pulsar topic name.
- Key: Optional. If the record has a key associated with it.
- Value: Required. The actual data of this record.
- Partition Id: Optional. If the record is originated from a partitioned source,return its partition id. The partition id will be used as part of the unique identifierby Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
- Record Sequence: Optional. If the record is originated from a sequential source,return its record sequence. The record sequence will be used as part of the unique identifierby Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.
- Properties: Optional. If the record carries user-defined properties, return those properties.Additionally, the implemention of the record should provide two methods:
ack
andfail
. Thesetwo methods will be used by Pulsar IO connector to acknowledge the records that it has doneprocessing and fail the records that it has failed to process.
KafkaSource
is a good example to follow.
Develop a sink connector
Developing a sink connector is as easy as developing a source connector. You just need toimplement Sink
interface.
Similarly, you first need to implement the open
method to initialize all the necessary resourcesbefore implementing the write
method.
/**
* Open connector with configuration
*
* @param config initialization config
* @param sinkContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
The main task for a Sink implementor is to implement write
method.
/**
* Write a message to Sink
* @param inputRecordContext Context of input record from the source
* @param record record to write to sink
* @throws Exception
*/
void write(Record<T> record) throws Exception;
In the implemention of write
method, the implementor can decide how to write the value andthe optional key to the actual source, and leverage all the provided information such asPartition Id
, Record Sequence
for achieving different processing guarantees. The implementoris also responsible for acknowledging records if it has successfully written them or failingrecords if has failed to write them.
Testing
Testing connectors can be challenging because Pulsar IO connectors interact with two systemsthat may be difficult to mock - Pulsar and the system the connector is connecting to. It isrecommended to write very specificially test the functionalities of the connector classeswhile mocking the external services.
Once you have written sufficient unit tests for your connector, we also recommend addingseparate integration tests to verify end-to-end functionality. In Pulsar, we are usingtestcontainers for all Pulsar integration tests. Pulsar IOIntegrationTests
are good examples to follow on integration testing your connectors.
Packaging
Once you've developed and tested your connector, you must package it so that it can be submittedto a Pulsar Functions cluster. There are two approaches describedhere work with Pulsar Functions' runtime.
If you plan to package and distribute your connector for others to use, you are obligated toproperly license and copyright your own code and to adhere to the licensing and copyrights ofall libraries your code uses and that you include in your distribution. If you are using theapproach described in "Creating a NAR package", the NAR plugin willautomatically create a DEPENDENCIES
file in the generated NAR package, including the properlicensing and copyrights of all libraries of your connector.
Creating a NAR package
The easiest approach to packaging a Pulsar IO connector is to create a NAR package usingnifi-nar-maven-plugin.
NAR stands for NiFi Archive. It is a custom packaging mechanism used by Apache NiFi, to providea bit of Java ClassLoader isolation. For more details, you can read thisblog post to understandhow NAR works. Pulsar uses the same mechanism for packaging all the builtin connectors.
All what you need is to include this nifi-nar-maven-plugin in your maven project for your connector. For example:
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
</plugin>
</plugins>
The TwitterFirehose
connector is a good example to follow.
Creating an Uber JAR
An alternative approach is to create an uber JAR that contains all of the connector's JAR filesand other resource files. No directory internal structure is necessary.
You can use maven-shade-plugin to create a Uber JAR. For example:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>