Apache Pulsar Connector

Flink provides an Apache Pulsar connector for reading and writing data from and to Pulsar topics with exactly-once guarantees.

Dependency

You can use the connector with the Pulsar 2.10.0 or higher. It is recommended to always use the latest Pulsar version. The details on Pulsar compatibility can be found in PIP-72.

There is no connector (yet) available for Flink version 1.19.

In order to use the in PyFlink jobs, the following dependencies are required:

VersionPyFlink JAR
flink-connector-pulsarThere is no SQL jar (yet) available for Flink version 1.19.

See Python dependency management for more details on how to use JARs in PyFlink.

Flink’s streaming connectors are not part of the binary distribution. See how to link with them for cluster execution here.

Pulsar Source

This part describes the Pulsar source based on the new data source API.

Usage

The Pulsar source provides a builder class for constructing a PulsarSource instance. The code snippet below builds a PulsarSource instance. It consumes messages from the earliest cursor of the topic “persistent://public/default/my-topic” in Exclusive subscription type (my-subscription) and deserializes the raw payload of the messages as strings.

Java

  1. PulsarSource<String> source = PulsarSource.builder()
  2. .setServiceUrl(serviceUrl)
  3. .setAdminUrl(adminUrl)
  4. .setStartCursor(StartCursor.earliest())
  5. .setTopics("my-topic")
  6. .setDeserializationSchema(new SimpleStringSchema())
  7. .setSubscriptionName("my-subscription")
  8. .build();
  9. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

Python

  1. pulsar_source = PulsarSource.builder() \
  2. .set_service_url('pulsar://localhost:6650') \
  3. .set_admin_url('http://localhost:8080') \
  4. .set_start_cursor(StartCursor.earliest()) \
  5. .set_topics("my-topic") \
  6. .set_deserialization_schema(SimpleStringSchema()) \
  7. .set_subscription_name('my-subscription') \
  8. .build()
  9. env.from_source(source=pulsar_source,
  10. watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
  11. source_name="pulsar source")

The following properties are required for building a PulsarSource:

  • Pulsar service URL, configured by setServiceUrl(String)
  • Pulsar service HTTP URL (also known as admin URL), configured by setAdminUrl(String)
  • Pulsar subscription name, configured by setSubscriptionName(String)
  • Topics / partitions to subscribe, see the following topic-partition subscription for more details.
  • Deserializer to parse Pulsar messages, see the following deserializer for more details.

It is recommended to set the consumer name in Pulsar Source by setConsumerName(String). This sets a unique name for the Flink connector in the Pulsar statistic dashboard. You can use it to monitor the performance of your Flink connector and applications.

Topic-partition Subscription

Pulsar source provide two ways of topic-partition subscription:

  • Topic list, subscribing messages from all partitions in a list of topics. For example:

    Java

    1. PulsarSource.builder().setTopics("some-topic1", "some-topic2");
    2. // Partition 0 and 2 of topic "topic-a"
    3. PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");

    Python

    1. PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
    2. # Partition 0 and 2 of topic "topic-a"
    3. PulsarSource.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])
  • Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:

    Java

    1. PulsarSource.builder().setTopicPattern("topic-.*");

    Python

    1. PulsarSource.builder().set_topic_pattern("topic-.*")

Flexible Topic Naming

Since Pulsar 2.0, all topic names internally are in a form of {persistent|non-persistent}://tenant/namespace/topic. Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity). The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.

Topic propertyDefault
topic typepersistent
tenantpublic
namespacedefault

This table lists a mapping relationship between your input topic name and the translated topic name:

Input topic nameTranslated topic name
my-topicpersistent://public/default/my-topic
my-tenant/my-namespace/my-topicpersistent://my-tenant/my-namespace/my-topic

For non-persistent topics, you need to specify the entire topic name, as the default-based rules do not apply for non-partitioned topics. Thus, you cannot use a short name like non-persistent://my-topic and need to use non-persistent://public/default/my-topic instead.

Subscribing Pulsar Topic Partition

Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.

For example, if a simple-string topic with 3 partitions is created under the sample tenant with the flink namespace. The topics on Pulsar would be:

Topic namePartitioned
persistent://sample/flink/simple-stringY
persistent://sample/flink/simple-string-partition-0N
persistent://sample/flink/simple-string-partition-1N
persistent://sample/flink/simple-string-partition-2N

You can directly consume messages from the topic partitions by using the non-partitioned topic names above. For example, use PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") would consume the partitions 1 and 2 of the sample/flink/simple-string topic.

Setting Topic Patterns

The Pulsar source can subscribe to a set of topics under only one tenant and one namespace by using regular expression. But the topic type (persistent or non-persistent) isn’t determined by the regular expression. Even if you use PulsarSource.builder().setTopicPattern("non-persistent://public/default/my-topic.*"), we will subscribe both persistent and non-persistent topics which its name matches public/default/my-topic.*.

In order to subscribe only non-persistent topics. You need to set the RegexSubscriptionMode to RegexSubscriptionMode.NonPersistentOnly. For example, setTopicPattern("topic-.*", RegexSubscriptionMode.NonPersistentOnly). And use setTopicPattern("topic-.*", RegexSubscriptionMode.PersistentOnly) will only subscribe to the persistent topics.

The regular expression should follow the topic naming pattern. Only the topic name part can be a regular expression. For example, if you provide a simple topic regular expression like some-topic-\d, we will filter all the topics under the public tenant with the default namespace. And if the topic regular expression is flink/sample/topic-.*, we will filter all the topics under the flink tenant with the sample namespace.

Currently, the latest released Pulsar 2.11.0 didn’t return the non-persistent topics correctly. You can’t use regular expression for filtering non-persistent topics in Pulsar 2.11.0.

See this issue: https://github.com/apache/pulsar/issues/19316 for the detailed context of this bug.

Deserializer

A deserializer (PulsarDeserializationSchema) is for decoding Pulsar messages from bytes. You can configure the deserializer using setDeserializationSchema(PulsarDeserializationSchema). The PulsarDeserializationSchema defines how to deserialize a Pulsar Message<byte[]>.

If only the raw payload of a message (message data in bytes) is needed, you can use the predefined PulsarDeserializationSchema. Pulsar connector provides three implementation methods.

  • Decode the message by using Pulsar’s Schema. If using KeyValue type or Struct types, the pulsar Schema does not contain type class info. But it is still needed to construct PulsarSchemaTypeInformation. So we provide two more APIs to pass the type info.

    1. // Primitive types
    2. PulsarSourceBuilder.setDeserializationSchema(Schema);
    3. // Struct types (JSON, Protobuf, Avro, etc.)
    4. PulsarSourceBuilder.setDeserializationSchema(Schema, Class);
    5. // KeyValue type
    6. PulsarSourceBuilder.setDeserializationSchema(Schema, Class, Class);
  • Decode the message by using Flink’s DeserializationSchema

    1. PulsarSourceBuilder.setDeserializationSchema(DeserializationSchema);
  • Decode the message by using Flink’s TypeInformation

    1. PulsarSourceBuilder.setDeserializationSchema(TypeInformation, ExecutionConfig);

Pulsar Message<byte[]> contains some extra properties, such as message key, message publish time, message time, and application-defined key/value pairs etc. These properties could be defined in the Message<byte[]> interface.

If you want to deserialize the Pulsar message by these properties, you need to implement PulsarDeserializationSchema. Ensure that the TypeInformation from the PulsarDeserializationSchema.getProducedType() is correct. Flink uses this TypeInformation to pass the messages to downstream operators.

Schema Evolution in Source

Schema evolution can be enabled by users using Pulsar’s Schema and PulsarSourceBuilder.enableSchemaEvolution(). This means that any broker schema validation is in place.

  1. Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
  2. PulsarSource<SomePojo> source = PulsarSource.builder()
  3. ...
  4. .setDeserializationSchema(schema, SomePojo.class)
  5. .enableSchemaEvolution()
  6. .build();

If you use Pulsar schema without enabling schema evolution, we will bypass the schema check. This may cause some errors when you use a wrong schema to deserialize the messages.

Use Auto Consume Schema

Pulsar provides Schema.AUTO_CONSUME() for consuming message without a predefined schema. This is always used when the topic has multiple schemas and may not be compatible with each other. Pulsar will auto decode the message into a GenericRecord for the user.

But the PulsarSourceBuilder.setDeserializationSchema(Schema) method doesn’t support the Schema.AUTO_CONSUME(). Instead, we provide the GenericRecordDeserializer for deserializing the GenericRecord. You can implement this interface and set it in the PulsarSourceBuilder.setDeserializationSchema(GenericRecordDeserializer).

  1. GenericRecordDeserializer<SomePojo> deserializer = ...
  2. PulsarSource<SomePojo> source = PulsarSource.builder()
  3. ...
  4. .setDeserializationSchema(deserializer)
  5. .build();

Currently, auto consume schema only supports AVRO, JSON and Protobuf schemas.

Define a RangeGenerator

Ensure that you have provided a RangeGenerator implementation if you want to consume a subset of keys on the Pulsar connector. The RangeGenerator generates a set of key hash ranges so that a respective reader subtask only dispatches messages where the hash of the message key is contained in the specified range.

Since the Pulsar didn’t expose the key hash range method. We have to provide an FixedKeysRangeGenerator for end-user. You can add the keys you want to consume, no need to calculate any hash ranges. The key’s hash isn’t specified to only one key, so the consuming results may contain the messages with different keys comparing the keys you have defined in this range generator. Remember to use flink’s DataStream.filter() method after the Pulsar source.

  1. FixedKeysRangeGenerator.builder()
  2. .supportNullKey()
  3. .key("someKey")
  4. .keys(Arrays.asList("key1", "key2"))
  5. .build()

Starting Position

The Pulsar source is able to consume messages starting from different positions by setting the setStartCursor(StartCursor) option. Built-in start cursors include:

  • Start from the earliest available message in the topic.

    Java

    1. StartCursor.earliest();

    Python

    1. StartCursor.earliest()
  • Start from the latest available message in the topic.

    Java

    1. StartCursor.latest();

    Python

    1. StartCursor.latest()
  • Start from a specified message between the earliest and the latest. The Pulsar connector consumes from the latest available message if the message ID does not exist.

    The start message is included in consuming result.

    Java

    1. StartCursor.fromMessageId(MessageId);

    Python

    1. StartCursor.from_message_id(message_id)
  • Start from a specified message between the earliest and the latest. The Pulsar connector consumes from the latest available message if the message ID doesn’t exist.

    Include or exclude the start message by using the second boolean parameter.

    Java

    1. StartCursor.fromMessageId(MessageId, boolean);

    Python

    1. StartCursor.from_message_id(message_id, boolean)
  • Start from the specified message publish time by Message<byte[]>.getPublishTime(). This method is deprecated because the name is totally wrong which may cause confuse. You can use StartCursor.fromPublishTime(long) instead.

    Java

    1. StartCursor.fromMessageTime(long);

    Python

    1. StartCursor.from_message_time(int)
  • Start from the specified message publish time by Message<byte[]>.getPublishTime().

    Java

    1. StartCursor.fromPublishTime(long);

    Python

    1. StartCursor.from_publish_time(int)

The StartCursor is used when the corresponding subscription is not created in Pulsar by default. The priority of the consumption start position is, checkpoint > existed subscription position > StartCursor. Sometimes, the end user may want to force the start position by using StartCursor. You should enable the pulsar.source.resetSubscriptionCursor option and start the pipeline without the saved checkpoint files. It is important to note that the given consumption position in the checkpoint is always the highest priority.

Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID (MessageId) of the message is ordered in that sequence. The MessageId contains some extra information (the ledger, entry, partition) about how the message is stored, you can create a MessageId by using DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex).

Boundedness

The Pulsar source supports streaming and batch execution mode. By default, the PulsarSource is configured for unbounded data.

For unbounded data the Pulsar source never stops until a Flink job is stopped or failed. You can use the setUnboundedStopCursor(StopCursor) to set the Pulsar source to stop at a specific stop position.

You can use setBoundedStopCursor(StopCursor) to specify a stop position for bounded data.

Built-in stop cursors include:

  • The Pulsar source never stops consuming messages.

    Java

    1. StopCursor.never();

    Python

    1. StopCursor.never()
  • Stop at the latest available message when the Pulsar source starts consuming messages.

    Java

    1. StopCursor.latest();

    Python

    1. StopCursor.latest()
  • Stop when the connector meets a given message, or stop at a message which is produced after this given message.

    Java

    1. StopCursor.atMessageId(MessageId);

    Python

    1. StopCursor.at_message_id(message_id)
  • Stop but include the given message in the consuming result.

    Java

    1. StopCursor.afterMessageId(MessageId);

    Python

    1. StopCursor.after_message_id(message_id)
  • Stop at the specified event time by Message<byte[]>.getEventTime(). The message with the given event time won’t be included in the consuming result.

    Java

    1. StopCursor.atEventTime(long);

    Python

    1. StopCursor.at_event_time(int)
  • Stop after the specified event time by Message<byte[]>.getEventTime(). The message with the given event time will be included in the consuming result.

    Java

    1. StopCursor.afterEventTime(long);

    Python

    1. StopCursor.after_event_time(int)
  • Stop at the specified publish time by Message<byte[]>.getPublishTime(). The message with the given publish time won’t be included in the consuming result.

    Java

    1. StopCursor.atPublishTime(long);

    Python

    1. StopCursor.at_publish_time(int)
  • Stop after the specified publish time by Message<byte[]>.getPublishTime(). The message with the given publish time will be included in the consuming result.

    Java

    1. StopCursor.afterPublishTime(long);

    Python

    1. StopCursor.after_publish_time(int)

Source Configurable Options

In addition to configuration options described above, you can set arbitrary options for PulsarClient, PulsarAdmin, Pulsar Consumer and PulsarSource by using setConfig(ConfigOption<T>, T), setConfig(Configuration) and setConfig(Properties).

PulsarClient Options

The Pulsar connector uses the client API to create the Consumer instance. The Pulsar connector extracts most parts of Pulsar’s ClientConfigurationData, which is required for creating a PulsarClient, as Flink configuration options in PulsarOptions.

KeyDefaultTypeDescription
pulsar.client.authParamMap
(none)MapParameters for the authentication plugin.
pulsar.client.authParams
(none)StringParameters for the authentication plugin.

Example:
key1:val1,key2:val2
pulsar.client.authPluginClassName
(none)StringName of the authentication plugin.
pulsar.client.concurrentLookupRequest
5000IntegerThe number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker. It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created PulsarClient
pulsar.client.connectionMaxIdleSeconds
180IntegerRelease the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections
pulsar.client.connectionTimeoutMs
10000IntegerDuration (in ms) of waiting for a connection to a broker to be established.
If the duration passes without a response from a broker, the connection attempt is dropped.
pulsar.client.connectionsPerBroker
1IntegerThe maximum number of connections that the client library will open to a single broker.
By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.
pulsar.client.dnsLookupBindAddress
(none)StringThe Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0:0 The default port is 0 which means random port. The max allowed port is 65535. The bind address should in host:port format.
pulsar.client.enableBusyWait
falseBooleanOption to enable busy-wait settings.
This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.
pulsar.client.enableTransaction
falseBooleanIf transaction is enabled, start the transactionCoordinatorClient with PulsarClient.
pulsar.client.initialBackoffIntervalNanos
100000000LongDefault duration (in nanoseconds) for a backoff interval.
pulsar.client.keepAliveIntervalSeconds
30IntegerInterval (in seconds) for keeping connection between the Pulsar client and broker alive.
pulsar.client.listenerName
(none)StringConfigure the listenerName that the broker will return the corresponding advertisedListener.
pulsar.client.lookupTimeoutMs
-1IntegerClient lookup timeout (in milliseconds).
Lookup operations have a different load pattern to other operations. They can be handled by any broker, are not proportional to throughput, and are harmless to retry. Given this, it makes sense to allow them to retry longer than normal operation, especially if they experience a timeout.
By default, this is set to match operation timeout. This is to maintain legacy behaviour. However, in practice it should be set to 5-10x the operation timeout.
pulsar.client.maxBackoffIntervalNanos
60000000000LongThe maximum duration (in nanoseconds) for a backoff interval.
pulsar.client.maxLookupRedirects
20IntegerThe maximum number of times a lookup-request redirections to a broker.
pulsar.client.maxLookupRequest
50000IntegerThe maximum number of lookup requests allowed on each broker connection to prevent overload on the broker. It should be greater than pulsar.client.concurrentLookupRequest. Requests that inside pulsar.client.concurrentLookupRequest are already sent to broker, and requests beyond pulsar.client.concurrentLookupRequest and under maxLookupRequests will wait in each client cnx.
pulsar.client.maxNumberOfRejectedRequestPerConnection
50IntegerThe maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed and the client creates a new connection to connect to a different broker.
pulsar.client.memoryLimitBytes
67108864LongThe limit (in bytes) on the amount of direct memory that will be allocated by this client instance.
Setting this to 0 will disable the limit.
pulsar.client.numIoThreads
1IntegerThe number of threads used for handling connections to brokers.
pulsar.client.numListenerThreads
1IntegerThe number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers that are using a listener model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering.
pulsar.client.operationTimeoutMs
30000IntegerOperation timeout (in ms). Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval. If the operation is not completed during this interval, the operation will be marked as failed.
pulsar.client.proxyProtocol
SNI

Enum

Protocol type to determine the type of proxy routing when a client connects to the proxy using pulsar.client.proxyServiceUrl.

Possible values:
  • “SNI”
pulsar.client.proxyServiceUrl
(none)StringProxy-service URL when a client connects to the broker via the proxy. The client can choose the type of proxy-routing.
pulsar.client.requestTimeoutMs
60000IntegerMaximum duration (in ms) for completing a request. This config option is not supported before Pulsar 2.8.1
pulsar.client.serviceUrl
(none)StringService URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
  • This is an example of localhost: pulsar://localhost:6650.
  • If you have multiple brokers, the URL is as: pulsar://localhost:6550,localhost:6651,localhost:6652
  • A URL for a production Pulsar cluster is as: pulsar://pulsar.us-west.example.com:6650
  • If you use TLS authentication, the URL is as pulsar+ssl://pulsar.us-west.example.com:6651
pulsar.client.socks5ProxyAddress
(none)StringAddress of SOCKS5 proxy. It should in host:port format.
pulsar.client.socks5ProxyPassword
(none)StringPassword of SOCKS5 proxy.
pulsar.client.socks5ProxyUsername
(none)StringUser name of SOCKS5 proxy.
pulsar.client.sslProvider
(none)StringThe name of the security provider used for SSL connections. The default value is the default security provider of the JVM.
pulsar.client.statsIntervalSeconds
60LongInterval between each stats info.
  • Stats is activated with positive statsInterval
  • Set statsIntervalSeconds to 1 second at least.
pulsar.client.tlsAllowInsecureConnection
falseBooleanWhether the Pulsar client accepts untrusted TLS certificate from the broker.
pulsar.client.tlsCertificateFilePath
(none)StringPath to the TLS certificate file.
pulsar.client.tlsCiphers
List<String>A list of cipher suites. This is a named combination of authentication, encryption, MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol. By default all the available cipher suites are supported.
pulsar.client.tlsHostnameVerificationEnable
falseBooleanWhether to enable TLS hostname verification. It allows to validate hostname verification when a client connects to the broker over TLS. It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker’s host name. It follows RFC 2818, 3.1. Server Identity hostname verification.
pulsar.client.tlsKeyFilePath
(none)StringPath to the TLS key file.
pulsar.client.tlsKeyStorePassword
(none)StringThe store password for the key store file.
pulsar.client.tlsKeyStorePath
(none)StringThe location of the key store file.
pulsar.client.tlsKeyStoreType
“JKS”StringThe file format of the key store file.
pulsar.client.tlsProtocols
List<String>The SSL protocol used to generate the SSLContext. By default, it is set TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.
pulsar.client.tlsTrustCertsFilePath
(none)StringPath to the trusted TLS certificate file.
pulsar.client.tlsTrustStorePassword
(none)StringThe store password for the key store file.
pulsar.client.tlsTrustStorePath
(none)StringThe location of the trust store file.
pulsar.client.tlsTrustStoreType
“JKS”StringThe file format of the trust store file.
pulsar.client.useKeyStoreTls
falseBooleanIf TLS is enabled, whether use the KeyStore type as the TLS configuration parameter. If it is set to false, it means to use the default pem type configuration.
pulsar.client.useTcpNoDelay
trueBooleanWhether to use the TCP no-delay flag on the connection to disable Nagle algorithm.
No-delay features ensures that packets are sent out on the network as soon as possible, and it is critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput. Therefore, if latency is not a concern, it is recommended to set this option to false.
By default, it is set to true.

PulsarAdmin Options

The admin API is used for querying topic metadata and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription. It shares most part of the configuration options with the client API. The configuration options listed here are only used in the admin API. They are also defined in PulsarOptions.

KeyDefaultTypeDescription
pulsar.admin.adminUrl
(none)StringThe Pulsar service HTTP URL for the admin endpoint. For example, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.
pulsar.admin.autoCertRefreshTime
300000IntegerThe auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.
pulsar.admin.connectTimeout
60000IntegerThe connection time out (in ms) for the PulsarAdmin client.
pulsar.admin.readTimeout
60000IntegerThe server response read timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestRates
5IntegerIt will add ratelimit for PulsarAdmin metadata requests, stands for requests per second.
pulsar.admin.requestRetries
5IntegerFor PulsarAdmin request, it will retry until we get a success response, fail if we exhausted retry count.
pulsar.admin.requestTimeout
300000IntegerThe server request timeout (in ms) for the PulsarAdmin client for any request.
pulsar.admin.requestWaitMillis
3000LongFor PulsarAdmin request, We will sleep the given time before retrying the failed request.

Pulsar Consumer Options

In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios. The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar’s ConsumerConfigurationData as Flink configuration options in PulsarSourceOptions.

KeyDefaultTypeDescription
pulsar.consumer.ackReceiptEnabled
falseBooleanAcknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.
pulsar.consumer.ackTimeoutMillis
0LongThe timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.
By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer.
pulsar.consumer.acknowledgementsGroupTimeMicros
100000LongGroup a consumer acknowledgment for a specified time (in μs). By default, a consumer uses 100μs grouping time to send out acknowledgments to a broker. If the group time is set to 0, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
falseBooleanBuffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull is true. Otherwise, it marks them for redelivery.
pulsar.consumer.autoScaledReceiverQueueSizeEnabled
trueBooleanThis option is enabled by default. The consumer receiver queue size is initialized with 1, and will double itself until it reaches the value set by pulsar.consumer.receiverQueueSize.
The feature should be able to reduce client memory usage.
pulsar.consumer.consumerName
(none)StringThe consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.
pulsar.consumer.cryptoFailureAction
FAIL

Enum

The consumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD: silently acknowledge but do not deliver messages to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

Fail to decompress the messages.
If messages contain batch messages, a client is not be able to retrieve individual messages in batch.
The delivered encrypted message contains EncryptionContext which contains encryption and compression information in. You can use an application to decrypt the consumed message payload.

Possible values:
  • “FAIL”
  • “DISCARD”
  • “CONSUME”
pulsar.consumer.deadLetterPolicy.deadLetterTopic
(none)StringName of the dead topic where the failed messages are sent.
pulsar.consumer.deadLetterPolicy.maxRedeliverCount
(none)IntegerThe maximum number of times that a message are redelivered before being sent to the dead letter queue.
pulsar.consumer.deadLetterPolicy.retryLetterTopic
(none)StringName of the retry topic where the failed messages are sent.
pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis
60000LongIf a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).
pulsar.consumer.maxPendingChunkedMessage
10IntegerThe consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages’ chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.
For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.
Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged. This behavior can be controlled by the pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull option.
pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
50000IntegerThe maximum total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
pulsar.consumer.negativeAckRedeliveryDelayMicros
60000000LongDelay (in μs) to wait before redelivering messages that failed to be processed.
When an application uses Consumer.negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
pulsar.consumer.poolMessages
falseBooleanEnable pooling of messages and the underlying data buffers.
pulsar.consumer.priorityLevel
0IntegerPriority level for a consumer to which a broker gives more priorities while dispatching messages in the subscription.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,…

Example 1
If a subscription has consumer A with priorityLevel 0 and consumer B with priorityLevel 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.
Example 2
Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1
The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
pulsar.consumer.properties
MapA name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.consumer.readCompacted
falseBooleanIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.
A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.
Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).
Attempting to enable it on subscriptions to non-persistent topics leads to a subscription call throwing a PulsarClientException.
pulsar.consumer.receiverQueueSize
1000IntegerSize of a consumer’s receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
pulsar.consumer.replicateSubscriptionState
falseBooleanIf replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.
pulsar.consumer.retryEnable
falseBooleanIf enabled, the consumer will automatically retry messages.
pulsar.consumer.subscriptionMode
Durable

Enum

Select the subscription mode to be used when subscribing to the topic.
  • Durable: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.
  • NonDurable: Lightweight subscription mode that doesn’t have a durable cursor associated


Possible values:
  • “Durable”
  • “NonDurable”
pulsar.consumer.subscriptionName
(none)StringSpecify the subscription name for this consumer. This argument is required when constructing the consumer.
pulsar.consumer.subscriptionProperties
(none)MapSubscription properties is an optional attribute, which can be set when subscribing to topic. These properties cannot be modified. We can only delete the subscription and create it again.
pulsar.consumer.tickDurationMillis
1000LongGranularity (in ms) of the ack-timeout redelivery.
A greater (for example, 1 hour) tickDurationMillis reduces the memory overhead to track messages.

PulsarSource Options

The configuration options below are mainly used for customizing the performance and message acknowledgement behavior. You can ignore them if you do not have any performance issues.

KeyDefaultTypeDescription
pulsar.source.allowKeySharedOutOfOrderDelivery
falseBooleanIf enabled, it will relax the ordering requirement, allowing the broker to send out-of-order messages in case of failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer.
In this case, a single consumer will still receive all the keys, but they may be coming in different orders.
pulsar.source.autoCommitCursorInterval
5000LongThis option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription. We would automatically commit the cursor using the given period (in ms).
pulsar.source.enableAutoAcknowledgeMessage
falseBooleanFlink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to true.
The source would use pulsar client’s internal mechanism and commit cursor in a given interval.
pulsar.source.enableMetrics
trueBooleanThe metrics from Pulsar Consumer are only exposed if you enable this option.You should set the pulsar.client.statsIntervalSeconds to a positive value if you enable this option.
pulsar.source.enableSchemaEvolution
falseBooleanIf you enable this option and use PulsarSourceBuilder.setDeserializationSchema(Schema), we would consume and deserialize the message by using Pulsar’s Schema interface with extra schema evolution check.
pulsar.source.fetchOneMessageTime
(none)IntegerThe time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar. We would consider there is no record at the current topic partition and stop fetching until next switch.
It’s not configured by default. We will use the remaining time in pulsar.source.maxFetchTime by default, which may cause a long wait in small message rates. Add this option in source builder avoiding waiting too long.
pulsar.source.maxFetchRecords
100IntegerThe maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchTime.
pulsar.source.maxFetchTime
10000LongThe maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of pulsar.source.maxFetchRecords.
pulsar.source.partitionDiscoveryIntervalMs
300000LongThe interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.
pulsar.source.resetSubscriptionCursor
falseBooleanThe StartCursor in connector is used to create the initial subscription. Enable this option will reset the start cursor in subscription by using StartCursor everytime you start the application without the checkpoint.
pulsar.source.verifyInitialOffsets
WARN_ON_MISMATCH

Enum

Upon (re)starting the source, check whether the expected message can be read. If failure is enabled, the application fails. Otherwise, it logs a warning. A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.

Possible values:
  • “FAIL_ON_MISMATCH”: Fail the consuming from Pulsar when we don’t find the related cursor.
  • “WARN_ON_MISMATCH”: Print a warn message and start consuming from the valid offset.

Dynamic Partition Discovery

To handle scenarios like topic scaling-out or topic creation without restarting the Flink job, the Pulsar source periodically discover new partitions under a provided topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for the PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS option:

Java

  1. // discover new partitions per 10 seconds
  2. PulsarSource.builder()
  3. .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);

Python

  1. # discover new partitions per 10 seconds
  2. PulsarSource.builder()
  3. .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
  • Partition discovery is enabled by default. The Pulsar connector queries the topic metadata every 5 minutes.
  • To disable partition discovery, you need to set a negative partition discovery interval.
  • Partition discovery is disabled for bounded data even if you set this option with a non-negative value.

Event Time and Watermarks

By default, the message uses the timestamp embedded in Pulsar Message<byte[]> as the event time. You can define your own WatermarkStrategy to extract the event time from the message, and emit the watermark downstream:

Java

  1. env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");

Python

  1. env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")

This documentation describes details about how to define a WatermarkStrategy.

Message Acknowledgement

When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully.

We use Exclusive subscription as the default subscription type. It supports cumulative acknowledgment. In this subscription type, Flink only needs to acknowledge the latest successfully consumed message. All the message before the given message are marked with a consumed status.

The Pulsar source acknowledges the current consuming message when checkpoints are completed, to ensure the consistency between Flink’s checkpoint state and committed position on the Pulsar brokers.

If checkpointing is disabled, Pulsar source periodically acknowledges messages. You can use the PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL option to set the acknowledgement period.

Pulsar source does NOT rely on committed positions for fault tolerance. Acknowledging messages is only for exposing the progress of consumers and monitoring on these two subscription types.

Pulsar Sink

The Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.

This part describes the Pulsar sink based on the new data sink API.

If you still want to use the legacy SinkFunction or on Flink 1.14 or previous releases, just use the StreamNative’s pulsar-flink.

Usage

The Pulsar Sink uses a builder class to construct the PulsarSink instance. This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.

Java

  1. DataStream<String> stream = ...
  2. PulsarSink<String> sink = PulsarSink.builder()
  3. .setServiceUrl(serviceUrl)
  4. .setAdminUrl(adminUrl)
  5. .setTopics("topic1")
  6. .setSerializationSchema(new SimpleStringSchema())
  7. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  8. .build();
  9. stream.sinkTo(sink);

Python

  1. stream = ...
  2. pulsar_sink = PulsarSink.builder() \
  3. .set_service_url('pulsar://localhost:6650') \
  4. .set_admin_url('http://localhost:8080') \
  5. .set_topics("topic1") \
  6. .set_serialization_schema(SimpleStringSchema()) \
  7. .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
  8. .build()
  9. stream.sink_to(pulsar_sink)

The following properties are required for building PulsarSink:

  • Pulsar service url, configured by setServiceUrl(String)
  • Pulsar service http url (aka. admin url), configured by setAdminUrl(String)
  • Topics / partitions to write, see Producing to topics for more details.
  • Serializer to generate Pulsar messages, see serializer for more details.

It is recommended to set the producer name in Pulsar Source by setProducerName(String). This sets a unique name for the Flink connector in the Pulsar statistic dashboard. You can use it to monitor the performance of your Flink connector and applications.

Producing to topics

Defining the topics for producing is similar to the topic-partition subscription in the Pulsar source. We support a mix-in style of topic setting. You can provide a list of topics, partitions, or both of them.

Java

  1. // Topic "some-topic1" and "some-topic2"
  2. PulsarSink.builder().setTopics("some-topic1", "some-topic2")
  3. // Partition 0 and 2 of topic "topic-a"
  4. PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
  5. // Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
  6. PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")

Python

  1. # Topic "some-topic1" and "some-topic2"
  2. PulsarSink.builder().set_topics(["some-topic1", "some-topic2"])
  3. # Partition 0 and 2 of topic "topic-a"
  4. PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])
  5. # Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
  6. PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "some-topic2"])

The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval. You can use the PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL option to change the discovery interval option.

Configuring writing targets can be replaced by using a custom [TopicRouter] message routing. Configuring partitions on the Pulsar connector is explained in the flexible topic naming section.

If you build the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only uses the topic.

For example, when using the PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0") option to build the Pulsar sink, this is simplified to PulsarSink.builder().setTopics("some-topic1").

Dynamic Topics by incoming messages

Topics could be defined by the incoming messages instead of providing the fixed topic set in builder. You can dynamically provide the topic by in a custom TopicRouter. The topic metadata can be queried by using PulsarSinkContext.topicMetadata(String) and the query result would be cached and expire in PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL milliseconds.

If you want to write to a non-existed topic, just return it in TopicRouter. Pulsar connector will try to create it.

You need to enable the topic auto creation in Pulsar’s broker.conf when you want to write messages to a non-existed topic. Set the allowAutoTopicCreation=true to enable it.

The allowAutoTopicCreationType option in broker.conf is used to control the type of topic that is allowed to be automatically created.

  • non-partitioned: The default type for the auto-created topic. It doesn’t have any partition and can’t be converted to a partitioned topic.
  • partitioned: The topic will be created as a partitioned topic. Set the defaultNumPartitions option to control the auto created partition size.

Serializer

A serializer (PulsarSerializationSchema) is required for serializing the record instance into bytes. Similar to PulsarSource, Pulsar sink supports both Flink’s SerializationSchema and Pulsar’s Schema. But Pulsar’s Schema.AUTO_PRODUCE_BYTES() is not supported.

If you do not need the message key and other message properties in Pulsar’s Message interface, you can use the predefined PulsarSerializationSchema. The Pulsar sink provides two implementation methods.

  • Encode the message by using Pulsar’s Schema.

    1. // Primitive types
    2. PulsarSinkBuilder.setSerializationSchema(Schema)
    3. // Struct types (JSON, Protobuf, Avro, etc.)
    4. PulsarSinkBuilder.setSerializationSchema(Schema, Class)
    5. // KeyValue type
    6. PulsarSinkBuilder.setSerializationSchema(Schema, Class, Class)
  • Encode the message by using Flink’s SerializationSchema

    1. PulsarSinkBuilder.setSerializationSchema(SerializationSchema)

Schema Evolution in Sink

Schema evolution can be enabled by users using Pulsar’s Schema and PulsarSinkBuilder.enableSchemaEvolution(). This means that any broker schema validation is in place.

  1. Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
  2. PulsarSink<SomePojo> sink = PulsarSink.builder()
  3. ...
  4. .setSerializationSchema(schema, SomePojo.class)
  5. .enableSchemaEvolution()
  6. .build();

If you use Pulsar schema without enabling schema evolution, the target topic will have a Schema.BYTES schema. But Schema.BYTES isn’t stored in any Pulsar’s topic. An auto-created topic in this way will present no schema. Consumers will need to handle deserialization without Pulsar’s Schema (if needed) themselves.

For example, if you set PulsarSinkBuilder.setSerializationSchema(Schema.STRING) without enabling schema evolution, the schema stored in Pulsar topics is Schema.BYTES.

PulsarMessage<byte[]> validation

Pulsar topic always has at least one schema. The Schema.BYTES is the default one for any topic without schema being set. But sending messages bytes with Schema.BYTES bypass the schema validate. So the message sent with SerializationSchema and Schema which doesn’t enable the schema evolution may save the invalid messages in the topic.

You can enable the pulsar.sink.validateSinkMessageBytes option to let the connector use the Pulsar’s Schema.AUTO_PRODUCE_BYTES() which supports extra check for the message bytes before sending. It will query the latest schema in topic and use it to validate the message bytes.

But some schemas in Pulsar don’t support validation, so we disable this option by default. you should use it at your own risk.

Custom serializer

You can have your own serialization logic by implementing the PulsarSerializationSchema interface. The return type for this interface is PulsarMessage which you can’t create it directly. Instead, we use builder method for creating three types of the Pulsar messages.

  • Create a message with a Pulsar Scheme. This is always when you know the schema in topic. We will check if the given schema is compatible in the correspond topic.

    1. PulsarMessage.builder(Schema<M> schema, M message)
    2. ...
    3. .build();
  • Create the message without any Pulsar Scheme. The message type can only be the byte array. It won’t validate the message bytes by default.

    1. PulsarMessage.builder(byte[] bytes)
    2. ...
    3. .build();
  • Create a tombstone message with empty payloads. Tombstone is a special message which is supported in Pulsar.

    1. PulsarMessage.builder()
    2. ...
    3. .build();

Message Routing

Routing in Pulsar Sink is operated on the partition level. For a list of partitioned topics, the routing algorithm first collects all partitions from different topics, and then calculates routing within all the partitions. By default, Pulsar Sink supports two router implementations.

  • KeyHashTopicRouter: use the hashcode of the message’s key to decide the topic partition that messages are sent to.

    The message key is provided by PulsarSerializationSchema.key(IN, PulsarSinkContext) You need to implement this interface and extract the message key when you want to send the message with the same key to the same topic partition.

    If you do not provide the message key. A topic partition is randomly chosen from the topic list.

    The message key can be hashed in two ways: MessageKeyHash.JAVA_HASH and MessageKeyHash.MURMUR3_32_HASH. You can use the PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH option to choose the hash method.

  • RoundRobinRouter: Round-robin among all the partitions.

    All messages are sent to the first partition, and switch to the next partition after sending a fixed number of messages. The batch size can be customized by the PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES option.

Let’s assume there are ten messages and two topics. Topic A has two partitions while topic B has three partitions. The batch size is set to five messages. In this case, topic A has 5 messages per partition which topic B does not receive any messages.

You can configure custom routers by using the TopicRouter interface. If you implement a TopicRouter, ensure that it is serializable. And you can return partitions which are not available in the pre-discovered partition list.

Thus, you do not need to specify topics using the PulsarSinkBuilder.setTopics option when you implement the custom topic router.

  1. @PublicEvolving
  2. public interface TopicRouter<IN> extends Serializable {
  3. TopicPartition route(IN in, List<TopicPartition> partitions, PulsarSinkContext context);
  4. default void open(SinkConfiguration sinkConfiguration) {
  5. // Nothing to do by default.
  6. }
  7. }

Internally, a Pulsar partition is implemented as a topic. The Pulsar client provides APIs to hide this implementation detail and handles routing under the hood automatically. Pulsar Sink uses a lower client API to implement its own routing layer to support multiple topics routing.

For details, see partitioned topics.

Delivery Guarantee

PulsarSink supports three delivery guarantee semantics.

  • NONE: Data loss can happen even when the pipeline is running. Basically, we use a fire-and-forget strategy to send records to Pulsar topics in this mode. It means that this mode has the highest throughput.
  • AT_LEAST_ONCE: No data loss happens, but data duplication can happen after a restart from checkpoint.
  • EXACTLY_ONCE: No data loss happens. Each record is sent to the Pulsar broker only once. Pulsar Sink uses Pulsar transaction and two-phase commit (2PC) to ensure records are sent only once even after the pipeline restarts.

If you want to use EXACTLY_ONCE, make sure you have enabled the checkpoint on Flink and enabled the transaction on Pulsar. The Pulsar sink will write all the messages in a pending transaction and commit it after the successfully checkpointing.

The messages written to Pulsar after a pending transaction won’t be obtained based on the design of the Pulsar. You can acquire these messages only when the corresponding transaction is committed.

Delayed message delivery

Delayed message delivery enables you to delay the possibility to consume a message. With delayed message enabled, the Pulsar sink sends a message to the Pulsar topic immediately, but the message is delivered to a consumer once the specified delay is over.

Delayed message delivery only works in the Shared subscription type. In Exclusive and Failover subscription types, the delayed message is dispatched immediately.

You can configure the MessageDelayer to define when to send the message to the consumer. The default option is to never delay the message dispatching. You can use the MessageDelayer.fixed(Duration) option to Configure delaying all messages in a fixed duration. You can also implement the MessageDelayer interface to dispatch messages at different time.

The dispatch time should be calculated by the PulsarSinkContext.processTime().

Sink Configurable Options

You can set options for PulsarClient, PulsarAdmin, Pulsar Producer and PulsarSink by using setConfig(ConfigOption<T>, T), setConfig(Configuration) and setConfig(Properties).

PulsarClient and PulsarAdmin Options

For details, refer to PulsarAdmin options.

Pulsar Producer Options

The Pulsar connector uses the Producer API to send messages. It extracts most parts of Pulsar’s ProducerConfigurationData as Flink configuration options in PulsarSinkOptions.

KeyDefaultTypeDescription
pulsar.producer.batchingEnabled
trueBooleanEnable batch send ability, it was enabled by default.
pulsar.producer.batchingMaxBytes
131072IntegerThe maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions.
pulsar.producer.batchingMaxMessages
1000IntegerThe maximum number of messages permitted in a batch.
pulsar.producer.batchingMaxPublishDelayMicros
1000LongBatching time period of sending messages.
pulsar.producer.batchingPartitionSwitchFrequencyByPublishDelay
10IntegerThe maximum wait time for switching topic partitions.
pulsar.producer.chunkMaxMessageSize
-1IntegerMax chunk message size in bytes. Producer chunks the message if chunking is enabled and message size is larger than max chunk-message size. By default, chunkMaxMessageSize value is -1 and producer chunks based on max-message size configured at the broker.
pulsar.producer.chunkingEnabled
falseBooleanIf message size is higher than allowed max publish-payload size by broker, then enableChunking helps producer to split message into multiple chunks and publish them to broker separately and in order. So, it allows client to successfully publish large size of messages in pulsar.
pulsar.producer.compressionType
NONE

Enum

Message data compression type used by a producer.Available options:

Possible values:
  • “NONE”
  • “LZ4”
  • “ZLIB”
  • “ZSTD”
  • “SNAPPY”
pulsar.producer.initialSequenceId
(none)LongThe sequence id for avoiding the duplication, it’s used when Pulsar doesn’t have transaction.
pulsar.producer.producerCryptoFailureAction
FAIL

Enum

The action the producer will take in case of encryption failures.

Possible values:
  • “FAIL”
  • “SEND”
pulsar.producer.producerName
(none)StringA producer name which would be displayed in the Pulsar’s dashboard. If no producer name was provided, we would use a Pulsar generated name instead.
pulsar.producer.properties
MapA name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
pulsar.producer.sendTimeoutMs
30000LongMessage send timeout in ms.If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.

PulsarSink Options

The configuration options below are mainly used for customizing the performance and message sending behavior. You can just leave them alone if you do not have any performance issues.

KeyDefaultTypeDescription
pulsar.sink.deliveryGuarantee
none

Enum

Optional delivery guarantee when committing.

Possible values:
  • “exactly-once”: Records are only delivered exactly-once also under failover scenarios. To build a complete exactly-once pipeline is required that the source and sink support exactly-once and are properly configured.
  • “at-least-once”: Records are ensured to be delivered but it may happen that the same record is delivered multiple times. Usually, this guarantee is faster than the exactly-once delivery.
  • “none”: Records are delivered on a best effort basis. It is often the fastest way to process records but it may happen that records are lost or duplicated.
pulsar.sink.enableMetrics
trueBooleanThe metrics from Pulsar Producer are only exposed if you enable this option. You should set the pulsar.client.statsIntervalSeconds to a positive value if you enable this option.
pulsar.sink.enableSchemaEvolution
falseBooleanIf you enable this option and use PulsarSinkBuilder.setSerializationSchema(Schema), we would produce and serialize the message by using Pulsar’s Schema.
pulsar.sink.maxRecommitTimes
5IntegerThe allowed transaction recommit times if we meet some retryable exception. This is used in Pulsar Transaction.
pulsar.sink.messageKeyHash
murmur-3-32-hash

Enum

The hash policy for routing message by calculating the hash code of message key.

Possible values:
  • “java-hash”: This hash would use String.hashCode() to calculate the message key string’s hash code.
  • “murmur-3-32-hash”: This hash would calculate message key’s hash code by using Murmur3 algorithm.
pulsar.sink.topicMetadataRefreshInterval
1800000LongAuto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.
pulsar.sink.transactionTimeoutMillis
10800000LongThis option is used when the user require the DeliveryGuarantee.EXACTLY_ONCE semantic. We would use transaction for making sure the message could be write only once.
pulsar.sink.validateSinkMessageBytes
falseBooleanPulsar client can validate the raw message bytes with the latest topic schema. This can make sure your serialized messages bytes is valid for consumer.

Brief Design Rationale

Pulsar sink follow the Sink API defined in FLIP-191.

Stateless SinkWriter

In EXACTLY_ONCE mode, the Pulsar sink does not store transaction information in a checkpoint. That means that new transactions will be created after a restart. Therefore, any message in previous pending transactions is either aborted or timed out (They are never visible to the downstream Pulsar consumer). The Pulsar team is working to optimize the needed resources by unfinished pending transactions.

Pulsar Schema Evolution

Pulsar Schema Evolution allows you to reuse the same Flink job after certain “allowed” data model changes, like adding or deleting a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules and define an auto schema update. For details, refer to Pulsar Schema Evolution.

Monitor the Metrics

The Pulsar client refreshes its stats every 60 seconds by default. To increase the metrics refresh frequency, you can change the Pulsar client stats refresh interval to a smaller value (minimum 1 second), as shown below.

Java

  1. builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);

Python

  1. builder.set_config("pulsar.client.statsIntervalSeconds", "1")

Source Metrics

Flink defines common source metrics in FLIP-33: Standardize Connector Metrics. Pulsar connector will expose some client metrics if you enable the pulsar.source.enableMetrics option. All the custom source metrics are listed in below table.

Java

  1. builder.setConfig(PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS, true);

Python

  1. builder.set_config("pulsar.source.enableMetrics", "true")
MetricsUser VariablesDescriptionType
PulsarConsumer.“Topic”.“ConsumerName”.numMsgsReceivedTopic, ConsumerNameNumber of messages received in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numBytesReceivedTopic, ConsumerNameNumber of bytes received in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.rateMsgsReceivedTopic, ConsumerNameRate of bytes per second received in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.rateBytesReceivedTopic, ConsumerNameRate of bytes per second received in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numAcksSentTopic, ConsumerNameNumber of message acknowledgments sent in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numAcksFailedTopic, ConsumerNameNumber of message acknowledgments failed in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numReceiveFailedTopic, ConsumerNameNumber of message receive failed in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.numBatchReceiveFailedTopic, ConsumerNameNumber of message batch receive failed in the last interval.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalMsgsReceivedTopic, ConsumerNameTotal number of messages received by this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalBytesReceivedTopic, ConsumerNameTotal number of bytes received by this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalReceivedFailedTopic, ConsumerNameTotal number of messages receive failures by this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalBatchReceivedFailedTopic, ConsumerNameTotal number of messages batch receive failures by this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalAcksSentTopic, ConsumerNameTotal number of message acknowledgments sent by this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.totalAcksFailedTopic, ConsumerNameTotal number of message acknowledgments failures on this consumer.Gauge
PulsarConsumer.“Topic”.“ConsumerName”.msgNumInReceiverQueueTopic, ConsumerNameThe size of receiver queue on this consumer.Gauge

Sink Metrics

The below table lists supported sink metrics. The first 6 metrics are standard Pulsar Sink metrics as described in FLIP-33: Standardize Connector Metrics.

The first 5 metrics are exposed to the flink metric system by default. You should enable the pulsar.sink.enableMetrics option to get the remaining metrics exposed.

Java

  1. builder.setConfig(PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS, true);

Python

  1. builder.set_config("pulsar.sink.enableMetrics", "true")
MetricsUser VariablesDescriptionType
numBytesOutn/aThe total number of output bytes since the sink starts. Count towards the numBytesOut in TaskIOMetricsGroup.Counter
numBytesOutPerSecondn/aThe output bytes per second.Meter
numRecordsOutn/aThe total number of output records since the sink starts.Counter
numRecordsOutPerSecondn/aThe output records per second.Meter
numRecordsOutErrorsn/aThe total number of records failed to send.Counter
currentSendTimen/aThe time it takes to send the last record, from enqueue the message in client buffer to its ack.Gauge
PulsarProducer.“Topic”.“ProducerName”.numMsgsSentTopic, ProducerNameThe number of messages published in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.numBytesSentTopic, ProducerNameThe number of bytes sent in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.numSendFailedTopic, ProducerNameThe number of failed send operations in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.numAcksReceivedTopic, ProducerNameThe number of send acknowledges received by broker in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendMsgsRateTopic, ProducerNameThe messages send rate in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendBytesRateTopic, ProducerNameThe bytes send rate in the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis50pctTopic, ProducerNameThe 50% of send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis75pctTopic, ProducerNameThe 75% of send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis95pctTopic, ProducerNameThe 95% of send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis99pctTopic, ProducerNameThe 99% of send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillis999pctTopic, ProducerNameThe 99.9% of send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.sendLatencyMillisMaxTopic, ProducerNameThe maximum send latency in milliseconds for the last interval.Gauge
PulsarProducer.“Topic”.“ProducerName”.totalMsgsSentTopic, ProducerNameThe total number of messages published by this producer.Gauge
PulsarProducer.“Topic”.“ProducerName”.totalBytesSentTopic, ProducerNameThe total number of bytes sent by this producer.Gauge
PulsarProducer.“Topic”.“ProducerName”.totalSendFailedTopic, ProducerNameThe total number of failed send operations.Gauge
PulsarProducer.“Topic”.“ProducerName”.totalAcksReceivedTopic, ProducerNameThe total number of send acknowledges received by broker.Gauge
PulsarProducer.“Topic”.“ProducerName”.pendingQueueSizeTopic, ProducerNameThe current pending send-message queue size of the producer.Gauge
  • numBytesOut, numRecordsOut and numRecordsOutErrors are retrieved from Pulsar client metrics.

  • numBytesOutPerSecond and numRecordsOutPerSecond are calculated based on the numBytesOut and numRecordsOUt counter respectively. Flink internally uses a fixed 60-seconds window to calculate the rates.

  • currentSendTime tracks the time from when the producer calls sendAync() to the time when the broker acknowledges the message. This metric is not available in NONE delivery guarantee.

End-to-end encryption

Flink can use Pulsar’s encryption to encrypt messages on the sink side and decrypt messages on the source side. Users should provide the public and private key pair to perform the encryption. Only with a valid key pair can decrypt the encrypted messages.

How to enable end-to-end encryption

  1. Generate a set of key pairs.

    Pulsar supports multiple ECDSA or RSA key pairs in the meantime, you can provide multiple key pairs. We will randomly choose a key pair to encrypt the message which makes the encryption more secure.

    1. # ECDSA (for Java clients only)
    2. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
    3. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
    4. # RSA
    5. openssl genrsa -out test_rsa_privkey.pem 2048
    6. openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
  2. Implement the CryptoKeyReader interface.

    Each key pair should have a unique key name. Implement the CryptoKeyReader interface and make sure CryptoKeyReader.getPublicKey() and CryptoKeyReader.getPrivateKey() can return the corresponding key by the key name.

    Pulsar provided a default CryptoKeyReader implementation named DefaultCryptoKeyReader. You can create it by using the DefaultCryptoKeyReader.builder(). And make sure the key pair files should be placed on the Flink running environment.

    1. // defaultPublicKey and defaultPrivateKey should be provided in this implementation.
    2. // The file:///path/to/default-public.key should be a valid path on Flink's running environment.
    3. CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
    4. .defaultPublicKey("file:///path/to/default-public.key")
    5. .defaultPrivateKey("file:///path/to/default-private.key")
    6. .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
    7. .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
    8. .build();
  3. (Optional) Implement the MessageCrypto<MessageMetadata, MessageMetadata> interface.

    Pulsar supports the ECDSA, RSA out of box. You don’t need to implement this interface if you use the common existing encryption methods. If you want to define a custom key pair based crypto method, just implement the MessageCrypto<MessageMetadata, MessageMetadata> interface. You can read the Pulsar’s default implementation, the MessageCryptoBc, for how to implement this crypto interface.

  4. Create PulsarCrypto instance.

    PulsarCrypto is used for providing all the required information for encryption and decryption. You can use the builder method to create the instance.

    1. CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
    2. .defaultPublicKey("file:///path/to/public1.key")
    3. .defaultPrivateKey("file:///path/to/private2.key")
    4. .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
    5. .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
    6. .build();
    7. // This line is only used as an example. It returns the default implementation of the MessageCrypto.
    8. SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>> cryptoSupplier = () -> new MessageCryptoBc();
    9. PulsarCrypto pulsarCrypto = PulsarCrypto.builder()
    10. .cryptoKeyReader(keyReader)
    11. // All the key name should be provided here, you can't encrypt the message with any non-existed key names.
    12. .addEncryptKeys("key1", "key2")
    13. // You don't have to provide the MessageCrypto.
    14. .messageCrypto(cryptoSupplier)
    15. .build()

Decrypt the message on the Pulsar source

Follow the previous instruction to create a PulsarCrypto instance and pass it to the PulsarSource.builder(). You need to choose the decrypt failure action in the meantime. Pulsar has three types of failure action which defines in ConsumerCryptoFailureAction.

  • ConsumerCryptoFailureAction.FAIL: The Flink pipeline will crash and turn into a failed state.

  • ConsumerCryptoFailureAction.DISCARD: Message is silently drop and not delivered to the downstream.

  • ConsumerCryptoFailureAction.CONSUME

    The message will not be decrypted and directly passed to downstream. You can decrypt the message in PulsarDeserializationSchema, the encryption information can be retrieved from Message.getEncryptionCtx().

  1. PulsarCrypto pulsarCrypto = ...
  2. PulsarSource<String> sink = PulsarSource.builder()
  3. ...
  4. .setPulsarCrypto(pulsarCrypto, ConsumerCryptoFailureAction.FAIL)
  5. .build();

Encrypt the message on the Pulsar sink

Follow the previous instruction to create a PulsarCrypto instance and pass it to the PulsarSink.builder(). You need to choose the encrypt failure action in the meantime. Pulsar has two types of failure action which defines in ProducerCryptoFailureAction.

  • ProducerCryptoFailureAction.FAIL: The Flink pipeline will crash and turn into a failed state.
  • ProducerCryptoFailureAction.SEND: Send the unencrypted messages.
  1. PulsarCrypto pulsarCrypto = ...
  2. PulsarSink<String> sink = PulsarSink.builder()
  3. ...
  4. .setPulsarCrypto(pulsarCrypto, ProducerCryptoFailureAction.FAIL)
  5. .build();

Upgrading to the Latest Connector Version

The generic upgrade steps are outlined in upgrading jobs and Flink versions guide. The Pulsar connector does not store any state on the Flink side. The Pulsar connector pushes and stores all the states on the Pulsar side. For Pulsar, you additionally need to know these limitations:

  • Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
  • Always use a newer Pulsar client with Pulsar connector to consume messages from Pulsar.

Troubleshooting

If you have a problem with Pulsar when using Flink, keep in mind that Flink only wraps PulsarClient or PulsarAdmin and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers, reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.

Known Issues

This section describes some known issues about the Pulsar connectors.

Unstable on Java 11

Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector on Java 8.

No TransactionCoordinatorNotFound, but automatic reconnect

Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2 introduces a break change in transactions. If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a TransactionCoordinatorNotFound exception.

You can use the latest pulsar-client-all release to resolve this issue.