Apache Pulsar SQL Connector
Scan Source: Unbounded Scan Source: Bounded Sink: Streaming Append Mode
The Pulsar connector allows for reading data from and writing data into Pulsar topics.
Dependencies
The Pulsar connector is not part of the binary distribution. See how to link with it for cluster execution here.
How to create a Pulsar table
The example below shows how to create a Pulsar table:
CREATE TABLE PulsarTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'pulsar',
'topics' = 'user_behavior',
'service-url' = 'pulsar://my-broker.com:6650',
)
Connector Options
Key | Default | Type | Description |
---|---|---|---|
explicit | true | Boolean | Indicate if the table is an explicit Flink table. |
key.fields | List<String> | An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined. | |
key.format | (none) | String | Defines the format identifier for decoding/encoding key bytes in Pulsar message. The identifier is used to discover a suitable format factory. |
service-url | (none) | String | Service URL provider for Pulsar service. To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL. You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
|
sink.custom-topic-router | (none) | String | (Optional) the custom topic router class URL that is used in the Pulsar DataStream sink connector. If this option is provided, the sink.topic-routing-mode option will be ignored. |
sink.message-delay-interval | 0 ms | Duration | (Optional) the message delay delivery interval that is used in the Pulsar DataStream sink connector. |
sink.topic-routing-mode | round-robin | Enum | (Optional) the topic routing mode. Available options are round-robin and message-key-hash . By default, it is set to round-robin . If you want to use a custom topic router, use the sink.custom-topic-router option to determine the partition for a particular message.Possible values:
|
source.start.message-id | (none) | String | (Optional) Message id that is used to specify a consuming starting point for source. Use earliest , latest or pass in a message id representation in ledgerId:entryId:partitionId , such as 12:2:-1 . This option takes precedence over source.start.publish-time. |
source.start.publish-time | (none) | Long | (Optional) Publish timestamp that is used to specify a starting point for the Pulsar DataStream source connector to consume data. Option source.start.message-id takes precedence over this one. |
source.stop.after-message-id | (none) | String | Optional message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in “ledgerId:entryId:partitionId”, such as “12:2:-1”. |
source.stop.at-message-id | (none) | String | Optional message id used to specify a stop cursor for the unbounded sql source. Use “never”, “latest” or pass in a message id representation in “ledgerId:entryId:partitionId”, such as “12:2:-1” |
source.stop.at-publish-time | (none) | Long | Optional publish timestamp used to specify a stop cursor for the unbounded sql source. |
source.subscription-name | (none) | String | The subscription name of the consumer that is used by the runtime Pulsar DataStream source connector. This argument is required for constructing the consumer. |
source.subscription-type | Exclusive | Enum | The subscription type that is supported by the Pulsar DataStream source connector. Currently, only Exclusive and Shared subscription types are supported.Possible values:
|
topics | (none) | List<String> | Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (; ) like topic-1;topic-2 . When a list of topics configured, please ensure that all the topics are in the same schema as Flink Table need a fixed schema. |
value.format | (none) | String | Defines the format identifier for decoding/encoding value data. The identifier is used to discover a suitable format factory. |