Apache Pulsar SQL Connector

Scan Source: Unbounded Scan Source: Bounded Sink: Streaming Append Mode

The Pulsar connector allows for reading data from and writing data into Pulsar topics.

Dependencies

The Pulsar connector is not part of the binary distribution. See how to link with it for cluster execution here.

How to create a Pulsar table

The example below shows how to create a Pulsar table:

  1. CREATE TABLE PulsarTable (
  2. `user_id` BIGINT,
  3. `item_id` BIGINT,
  4. `behavior` STRING,
  5. `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
  6. ) WITH (
  7. 'connector' = 'pulsar',
  8. 'topics' = 'user_behavior',
  9. 'service-url' = 'pulsar://my-broker.com:6650',
  10. )

Connector Options

KeyDefaultTypeDescription
explicit
trueBooleanIndicate if the table is an explicit Flink table.
key.fields
List<String>An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined.
key.format
(none)StringDefines the format identifier for decoding/encoding key bytes in Pulsar message. The identifier is used to discover a suitable format factory.
service-url
(none)StringService URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
sink.custom-topic-router
(none)String(Optional) the custom topic router class URL that is used in the Pulsar DataStream sink connector. If this option is provided, the sink.topic-routing-mode option will be ignored.
sink.message-delay-interval
0 msDuration(Optional) the message delay delivery interval that is used in the Pulsar DataStream sink connector.
sink.topic-routing-mode
round-robin

Enum

(Optional) the topic routing mode. Available options are round-robin and message-key-hash. By default, it is set to round-robin. If you want to use a custom topic router, use the sink.custom-topic-router option to determine the partition for a particular message.

Possible values:
  • “round-robin”: The producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it’s set to the same boundary of pulsar.producer.batchingMaxMessages, to ensure batching is effective.
  • “message-key-hash”: If no key is provided, The partitioned producer will randomly pick one single topic partition and publish all the messages into that partition. If a key is provided on the message, the partitioned producer will hash the key and assign the message to a particular partition.
  • “custom”: Use custom TopicRouter implementation that will be called to determine the partition for a particular message.
source.start.message-id
(none)String(Optional) Message id that is used to specify a consuming starting point for source. Use earliest, latest or pass in a message id representation in ledgerId:entryId:partitionId, such as 12:2:-1. This option takes precedence over source.start.publish-time.
source.start.publish-time
(none)Long(Optional) Publish timestamp that is used to specify a starting point for the Pulsar DataStream source connector to consume data. Option source.start.message-id takes precedence over this one.
source.stop.after-message-id
(none)StringOptional message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in “ledgerId:entryId:partitionId”, such as “12:2:-1”.
source.stop.at-message-id
(none)StringOptional message id used to specify a stop cursor for the unbounded sql source. Use “never”, “latest” or pass in a message id representation in “ledgerId:entryId:partitionId”, such as “12:2:-1”
source.stop.at-publish-time
(none)LongOptional publish timestamp used to specify a stop cursor for the unbounded sql source.
source.subscription-name
(none)StringThe subscription name of the consumer that is used by the runtime Pulsar DataStream source connector. This argument is required for constructing the consumer.
source.subscription-type
Exclusive

Enum

The subscription type that is supported by the Pulsar DataStream source connector. Currently, only Exclusive and Shared subscription types are supported.

Possible values:
  • “Exclusive”
  • “Shared”
  • “Failover”
  • “Key_Shared”
topics
(none)List<String>Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (;) like topic-1;topic-2. When a list of topics configured, please ensure that all the topics are in the same schema as Flink Table need a fixed schema.
value.format
(none)StringDefines the format identifier for decoding/encoding value data. The identifier is used to discover a suitable format factory.