Pulsar Java client

You can use a Pulsar Java client to create the Java producer, consumer, reader and TableView of messages and to perform administrative tasks. The current Java client version is 2.11.2.

All the methods in producer, consumer, reader and TableView of a Java client are thread-safe.

Javadoc for the Pulsar client is divided into two domains by package as follows.

PackageDescriptionMaven Artifact
org.apache.pulsar.client.apiThe producer and consumer APIorg.apache.pulsar:pulsar-client:2.11.2
org.apache.pulsar.client.adminThe Java admin APIorg.apache.pulsar:pulsar-client-admin:2.11.2
org.apache.pulsar.client.allInclude both pulsar-client and pulsar-client-admin
Both pulsar-client and pulsar-client-admin are shaded packages and they shade dependencies independently. Consequently, the applications using both pulsar-client and pulsar-client-admin have redundant shaded classes. It would be troublesome if you introduce new dependencies but forget to update shading rules.
In this case, you can use pulsar-client-all, which shades dependencies only one time and reduces the size of dependencies.
org.apache.pulsar:pulsar-client-all:2.11.2

This document focuses only on the client API for producing and consuming messages on Pulsar topics. For how to use the Java admin client, see Pulsar admin interface.

Installation

The latest version of the Pulsar Java client library is available via Maven Central. To use the latest version, add the pulsar-client library to your build configuration.

Java - 图1tip

Maven

If you use Maven, add the following information to the pom.xml file.

  1. <!-- in your <properties> block -->
  2. <pulsar.version>2.11.2</pulsar.version>
  3. <!-- in your <dependencies> block -->
  4. <dependency>
  5. <groupId>org.apache.pulsar</groupId>
  6. <artifactId>pulsar-client</artifactId>
  7. <version>${pulsar.version}</version>
  8. </dependency>

Gradle

If you use Gradle, add the following information to the build.gradle file.

  1. def pulsarVersion = '2.11.2'
  2. dependencies {
  3. compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
  4. }

Connection URLs

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme. The following is an example of localhost with the default port 6650:

  1. pulsar://localhost:6650

If you have multiple brokers, separate IP:port by commas:

  1. pulsar://localhost:6550,localhost:6651,localhost:6652

If you use mTLS authentication, add +ssl in the scheme:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Client

You can instantiate a PulsarClient object using just a URL for the target Pulsar cluster like this:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();

If you have multiple brokers, you can initiate a PulsarClient like this:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
  3. .build();

Java - 图2note

If you run a cluster in standalone mode, the broker is available at the pulsar://localhost:6650 URL by default.

If you create a client, you can use the loadConf configuration. The following parameters are available in loadConf.

NameType
Description
Default
serviceUrlStringService URL provider for Pulsar serviceNone
authPluginClassNameStringName of the authentication pluginNone
authParamsStringParameters for the authentication plugin

Example
key1:val1,key2:val2
None
operationTimeoutMslongoperationTimeoutMsOperation timeout
statsIntervalSecondslongInterval between each stats information

Stats is activated with positive statsInterval

Set statsIntervalSeconds to 1 second at least.
60
numIoThreadsintThe number of threads used for handling connections to brokers1
numListenerThreadsintThe number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers using the “listener” model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering. If you want multiple threads to process a single topic, you need to create a shared subscription and multiple consumers for this subscription. This does not ensure ordering.1
useTcpNoDelaybooleanWhether to use TCP no-delay flag on the connection to disable Nagle algorithmtrue
enableTlsbooleanWhether to use TLS encryption on the connection. Note that this parameter is deprecated. If you want to enable TLS, use pulsar+ssl:// in serviceUrl instead.false
tlsTrustCertsFilePathstringPath to the trusted TLS certificate fileNone
tlsAllowInsecureConnectionbooleanWhether the Pulsar client accepts untrusted TLS certificate from brokerfalse
tlsHostnameVerificationEnablebooleanWhether to enable TLS hostname verificationfalse
concurrentLookupRequestintThe number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker5000
maxLookupRequestintThe maximum number of lookup requests allowed on each broker connection to prevent overload on broker50000
maxNumberOfRejectedRequestPerConnectionintThe maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creates a new connection to connect to a different broker50
keepAliveIntervalSecondsintSeconds of keeping alive interval for each client broker connection30
connectionTimeoutMsintDuration of waiting for a connection to a broker to be established

If the duration passes without a response from a broker, the connection attempt is dropped
10000
requestTimeoutMsintMaximum duration for completing a request60000
defaultBackoffIntervalNanosintDefault duration for a backoff intervalTimeUnit.MILLISECONDS.toNanos(100);
maxBackoffIntervalNanoslongMaximum duration for a backoff intervalTimeUnit.SECONDS.toNanos(30)
socks5ProxyAddressSocketAddressSOCKS5 proxy addressNone
socks5ProxyUsernamestringSOCKS5 proxy usernameNone
socks5ProxyPasswordstringSOCKS5 proxy passwordNone
connectionMaxIdleSecondsintRelease the connection if it is not used for more than connectionMaxIdleSeconds seconds.If connectionMaxIdleSeconds < 0, disabled the feature that auto release the idle connection180

Check out the Javadoc for the PulsarClient class for a full list of configurable parameters.

In addition to client-level configuration, you can also apply producer and consumer specific configuration as described in sections below.

Client memory allocator configuration

You can set the client memory allocator configurations through Java properties.

PropertyType
Description
DefaultAvailable values
pulsar.allocator.pooledStringIf set to true, the client uses a direct memory pool.
If set to false, the client uses a heap memory without pool
true
  • true
  • false
  • pulsar.allocator.exit_on_oomStringWhether to exit the JVM when OOM happensfalse
  • true
  • false
  • pulsar.allocator.leak_detectionStringThe leak detection policy for Pulsar bytebuf allocator.
  • Disabled: No leak detection and no overhead.
  • Simple: Instruments 1% of the allocated buffer to track for leaks.
  • Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting stack traces of places where the buffer is used.
  • Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting stack traces of places where the buffer is used and introduces a significant overhead.
  • Disabled
  • Disabled
  • Simple
  • Advanced
  • Paranoid
  • pulsar.allocator.out_of_memory_policyStringWhen an OOM occurs, the client throws an exception or fallbacks to heapFallbackToHeap
  • ThrowException
  • FallbackToHeap
  • Example

    1. Dpulsar.allocator.pooled=true
    2. Dpulsar.allocator.exit_on_oom=false
    3. Dpulsar.allocator.leak_detection=Disabled
    4. Dpulsar.allocator.out_of_memory_policy=ThrowException

    Producer

    In Pulsar, producers write messages to topics. Once you’ve instantiated a PulsarClient object (as in the section above), you can create a Producer for a specific Pulsar topic.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic("my-topic")
    3. .create();
    4. // You can then send messages to the broker and topic you specified:
    5. producer.send("My message".getBytes());

    By default, producers produce messages that consist of byte arrays. You can produce different types by specifying a message schema.

    1. Producer<String> stringProducer = client.newProducer(Schema.STRING)
    2. .topic("my-topic")
    3. .create();
    4. stringProducer.send("My message");

    Make sure that you close your producers, consumers, and clients when you do not need them.

    1. producer.close();
    2. consumer.close();
    3. client.close();

    Close operations can also be asynchronous:

    1. producer.closeAsync()
    2. .thenRun(() -> System.out.println("Producer closed"))
    3. .exceptionally((ex) -> {
    4. System.err.println("Failed to close producer: " + ex);
    5. return null;
    6. });

    Configure producer

    If you instantiate a Producer object by specifying only a topic name as the example above, the default configuration of producer is used.

    If you create a producer, you can use the loadConf configuration. The following parameters are available in loadConf.

    NameType
    Description
    Default
    topicNamestringTopic namenull
    producerNamestringProducer namenull
    sendTimeoutMslongMessage send timeout in ms.
    If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.
    30000
    blockIfQueueFullbooleanIf it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.
    If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

    The MaxPendingMessages parameter determines the size of the outgoing message queue.
    false
    maxPendingMessagesintThe maximum size of a queue holding pending messages.

    For example, a message waiting to receive an acknowledgment from a broker.

    By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.
    1000
    maxPendingMessagesAcrossPartitionsintDeprecated, use memoryLimit instead. The maximum number of pending messages across partitions.

    Use the setting to lower the max pending messages for each partition ({@link setMaxPendingMessages(int)}) if the total number exceeds the configured value and maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages.
    50000
    messageRoutingModeMessageRoutingModeMessage routing logic for producers on partitioned topics.
    Apply the logic only when setting no key on messages.
    Available options are as follows:
  • pulsar.RoundRobinDistribution: round robin
  • pulsar.UseSinglePartition: publish all messages to a single partition
  • pulsar.CustomPartition: a custom partitioning scheme
  • pulsar.RoundRobinDistribution
  • hashingSchemeHashingSchemeHashing function determining the partition where you publish a particular message (partitioned topics only).
    Available options are as follows:
  • pulsar.JavastringHash: the equivalent of string.hashCode() in Java
  • pulsar.Murmur3_32Hash: applies the Murmur3 hashing function
  • pulsar.BoostHash: applies the hashing function from C++’s Boost library
  • HashingScheme.JavastringHash
    cryptoFailureActionProducerCryptoFailureActionProducer should take action when encryption fails.
  • FAIL: if encryption fails, unencrypted messages fail to send.
  • SEND: if encryption fails, unencrypted messages are sent.
  • ProducerCryptoFailureAction.FAIL
    batchingMaxPublishDelayMicroslongBatching time period of sending messages.TimeUnit.MILLISECONDS.toMicros(1)
    batchingMaxMessagesintThe maximum number of messages permitted in a batch.1000
    batchingEnabledbooleanEnable batching of messages.true
    chunkingEnabledbooleanEnable chunking of messages.false
    compressionTypeCompressionTypeMessage data compression type used by a producer.
    Available options:
  • LZ4
  • ZLIB
  • ZSTD
  • SNAPPY
  • No compression
    initialSubscriptionNamestringUse this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created.null

    You can configure parameters if you do not want to use the default configuration.

    For a full list, see the Javadoc for the ProducerBuilder class. The following is an example.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic("my-topic")
    3. .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    4. .sendTimeout(10, TimeUnit.SECONDS)
    5. .blockIfQueueFull(true)
    6. .create();

    Publish to partitioned topics

    By default, Pulsar topics are served by a single broker, which limits the maximum throughput of a topic. Partitioned topics can span multiple brokers and thus allow for higher throughput.

    You can publish messages to partitioned topics using Pulsar client libraries. When publishing messages to partitioned topics, you must specify a routing mode. If you do not specify any routing mode when you create a new producer, the round-robin routing mode is used.

    Routing mode

    You can specify the routing mode in the ProducerConfiguration object used to configure your producer. The routing mode determines which partition (internal topic) each message should be published to.

    The following MessageRoutingMode options are available.

    ModeDescription
    RoundRobinPartitionIf no key is provided, the producer publishes messages across all partitions in the round-robin policy to achieve the maximum throughput. Round-robin is not done per individual message. It is set to the same boundary of batching delay to ensure that batching is effective. If a key is specified on the message, the partitioned producer hashes the key and assigns the message to a particular partition. This is the default mode.
    SinglePartitionIf no key is provided, the producer picks a single partition randomly and publishes all messages into that partition. If a key is specified on the message, the partitioned producer hashes the key and assigns the message to a particular partition.
    CustomPartitionUse custom message router implementation that is called to determine the partition for a particular message. You can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

    The following is an example:

    1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
    2. String topic = "persistent://my-tenant/my-namespace/my-topic";
    3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
    4. Producer<byte[]> producer = pulsarClient.newProducer()
    5. .topic(topic)
    6. .messageRoutingMode(MessageRoutingMode.SinglePartition)
    7. .create();
    8. producer.send("Partitioned topic message".getBytes());

    Custom message router

    To use a custom message router, you need to provide an implementation of the MessageRouter interface, which has just one choosePartition method:

    1. public interface MessageRouter extends Serializable {
    2. int choosePartition(Message msg);
    3. }

    The following router routes every message to partition 10:

    1. public class AlwaysTenRouter implements MessageRouter {
    2. public int choosePartition(Message msg) {
    3. return 10;
    4. }
    5. }

    With that implementation, you can send messages to partitioned topics as below.

    1. String pulsarBrokerRootUrl = "pulsar://localhost:6650";
    2. String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
    3. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
    4. Producer<byte[]> producer = pulsarClient.newProducer()
    5. .topic(topic)
    6. .messageRouter(new AlwaysTenRouter())
    7. .create();
    8. producer.send("Partitioned topic message".getBytes());

    How to choose partitions when using a key

    If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose the partition when using a key.

    1. // If the message has a key, it supersedes the round robin routing policy
    2. if (msg.hasKey()) {
    3. return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
    4. }
    5. if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
    6. long currentMs = clock.millis();
    7. return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
    8. } else {
    9. return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
    10. }

    Async send

    You can publish messages asynchronously using the Java client. With async send, the producer puts the message in a blocking queue and returns it immediately. Then the client library sends the message to the broker in the background. If the queue is full (max size configurable), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer.

    The following is an example.

    1. producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
    2. System.out.println("Message with ID " + msgId + " successfully sent");
    3. });

    As you can see from the example above, async send operations return a MessageId wrapped in a CompletableFuture.

    Configure messages

    In addition to a value, you can set additional items on a given message:

    1. producer.newMessage()
    2. .key("my-message-key")
    3. .value("my-async-message".getBytes())
    4. .property("my-key", "my-value")
    5. .property("my-other-key", "my-other-value")
    6. .send();

    You can terminate the builder chain with sendAsync() and get a future return.

    Enable chunking

    Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages on the consumer side.

    The message chunking feature is OFF by default. The following is an example of how to enable message chunking when creating a producer.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic(topic)
    3. .enableChunking(true)
    4. .enableBatching(false)
    5. .create();

    By default, producer chunks the large message based on max message size (maxMessageSize) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration chunkMaxMessageSize.

    Java - 图3note

    To enable chunking, you need to disable batching (enableBatching\=false) concurrently.

    Intercept messages

    ProducerInterceptor intercepts and possibly mutates messages received by the producer before they are published to the brokers.

    The interface has three main events:

    • eligible checks if the interceptor can be applied to the message.
    • beforeSend is triggered before the producer sends the message to the broker. You can modify messages within this event.
    • onSendAcknowledgement is triggered when the message is acknowledged by the broker or the sending failed.

    To intercept messages, you can add a ProducerInterceptor or multiple ones when creating a Producer as follows.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic(topic)
    3. .intercept(new ProducerInterceptor {
    4. @Override
    5. boolean eligible(Message message) {
    6. return true; // process all messages
    7. }
    8. @Override
    9. Message beforeSend(Producer producer, Message message) {
    10. // user-defined processing logic
    11. }
    12. @Override
    13. void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
    14. // user-defined processing logic
    15. }
    16. })
    17. .create();

    Java - 图4note

    Multiple interceptors apply in the order they are passed to the intercept method.

    Consumer

    In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new consumer by first instantiating a PulsarClient object and passing it a URL for a Pulsar broker (as above).

    Once you’ve instantiated a PulsarClient object, you can create a Consumer by specifying a topic and a subscription.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscribe();

    The subscribe method will auto-subscribe the consumer to the specified topic and subscription. One way to make the consumer listen to the topic is to set up a while loop. In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed. If the processing logic fails, you can use negative acknowledgment to redeliver the message later.

    1. while (true) {
    2. // Wait for a message
    3. Message msg = consumer.receive();
    4. try {
    5. // Do something with the message
    6. System.out.println("Message received: " + new String(msg.getData()));
    7. // Acknowledge the message
    8. consumer.acknowledge(msg);
    9. } catch (Exception e) {
    10. // Message failed to process, redeliver later
    11. consumer.negativeAcknowledge(msg);
    12. }
    13. }

    If you don’t want to block your main thread but constantly listen for new messages, consider using a MessageListener. The MessageListener uses a thread pool inside the client. You can set the number of threads for message listeners in the ClientBuilder.

    1. MessageListener myMessageListener = (consumer, msg) -> {
    2. try {
    3. System.out.println("Message received: " + new String(msg.getData()));
    4. consumer.acknowledge(msg);
    5. } catch (Exception e) {
    6. consumer.negativeAcknowledge(msg);
    7. }
    8. }
    9. Consumer consumer = client.newConsumer()
    10. .topic("my-topic")
    11. .subscriptionName("my-subscription")
    12. .messageListener(myMessageListener)
    13. .subscribe();

    Configure consumer

    If you instantiate a Consumer object by specifying only a topic and subscription name as in the example above, the consumer uses the default configuration.

    When you create a consumer, you can use the loadConf configuration. The following parameters are available in loadConf.

    NameType
    Description
    Default
    topicNamesSet<String>Topic nameSets.newTreeSet()
    topicsPatternPatternTopic patternNone
    subscriptionNameStringSubscription nameNone
    subscriptionTypeSubscriptionTypeSubscription type
    Four subscription types are available:
  • Exclusive
  • Failover
  • Shared
  • Key_Shared
  • SubscriptionType.Exclusive
    receiverQueueSizeintSize of a consumer’s receiver queue.

    For example, the number of messages accumulated by a consumer before an application calls Receive.

    A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
    1000
    acknowledgementsGroupTimeMicroslongGroup a consumer acknowledgment for a specified time.

    By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.

    Setting a group time of 0 sends out acknowledgments immediately.

    A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
    TimeUnit.MILLISECONDS.toMicros(100)
    negativeAckRedeliveryDelayMicroslongDelay to wait before redelivering messages that failed to be processed.

    When an application uses {@link Consumer#negativeAcknowledge(Message)}, failed messages are redelivered after a fixed timeout.
    TimeUnit.MINUTES.toMicros(1)
    maxTotalReceiverQueueSizeAcrossPartitionsintThe max total receiver queue size across partitions.

    This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
    50000
    consumerNameStringConsumer namenull
    ackTimeoutMillislongTimeout of unacked messages0
    tickDurationMillislongGranularity of the ack-timeout redelivery.

    Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).
    1000
    priorityLevelintPriority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. It can be set at the consumer level so all topics being consumed will have the same priority level or each topic being consumed can be given a different priority level.

    The broker follows descending priorities. For example, 0=max-priority, 1, 2,…

    In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.

    Example 1
    If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.

    Example 2
    Consumer Priority, Level, Permits
    C1, 0, 2
    C2, 0, 1
    C3, 0, 1
    C4, 1, 2
    C5, 1, 1

    Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
    0
    cryptoFailureActionConsumerCryptoFailureActionConsumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD:silently acknowledge and not deliver message to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

  • The decompression of message fails.

    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

    Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.
  • ConsumerCryptoFailureAction.FAIL
  • propertiesSortedMap<String, String>A name or value property of this consumer.

    properties is application defined metadata attached to a consumer.

    When getting a topic stats, associate this metadata with the consumer stats for easier identification.
    new TreeMap()
    readCompactedbooleanIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

    A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

    Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

    Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
    false
    subscriptionInitialPositionSubscriptionInitialPositionInitial position at which to set cursor when subscribing to a topic at first time.SubscriptionInitialPosition.Latest
    patternAutoDiscoveryPeriodintTopic auto discovery period when using a pattern for topic’s consumer.

    The default and minimum value is 1 minute.
    1
    regexSubscriptionModeRegexSubscriptionModeWhen subscribing to a topic using a regular expression, you can pick a certain type of topics.

  • PersistentOnly: only subscribe to persistent topics.
  • NonPersistentOnly: only subscribe to non-persistent topics.
  • AllTopics: subscribe to both persistent and non-persistent topics.
  • RegexSubscriptionMode.PersistentOnly
    deadLetterPolicyDeadLetterPolicyDead letter policy for consumers.

    By default, some messages are probably redelivered many times, even to the extent that it never stops.

    By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically.

    You can enable the dead letter mechanism by setting deadLetterPolicy.

    Example

    client.newConsumer()
    .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
    .subscribe();


    Default dead letter topic name is {TopicName}-{Subscription}-DLQ.

    To set a custom dead letter topic name:
    client.newConsumer()
    .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10)
    .deadLetterTopic(“your-topic-name”).build())
    .subscribe();


    When specifying the dead letter policy while not specifying ackTimeoutMillis, you can set the ack timeout to 30000 millisecond.
    None
    autoUpdatePartitionsbooleanIf autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

    Note: this is only for partitioned consumers.
    true
    replicateSubscriptionStatebooleanIf replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.false
    negativeAckRedeliveryBackoffRedeliveryBackoffInterface for custom message is negativeAcked policy. You can specify RedeliveryBackoff for a consumer.MultiplierRedeliveryBackoff
    ackTimeoutRedeliveryBackoffRedeliveryBackoffInterface for custom message is ackTimeout policy. You can specify RedeliveryBackoff for a consumer.MultiplierRedeliveryBackoff
    autoAckOldestChunkedMessageOnQueueFullbooleanWhether to automatically acknowledge pending chunked messages when the threashold of maxPendingChunkedMessage is reached. If set to false, these messages will be redelivered by their broker.true
    maxPendingChunkedMessageintThe maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization.10
    expireTimeOfIncompleteChunkedMessageMillislongThe time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute.60000
    ackReceiptEnabledbooleanIf ackReceiptEnabled is enabled, ACK returns a receipt. The client got the ack receipt means the broker has processed the ack request, but if without transaction, the broker does not guarantee persistence of acknowledgments, which means the messages still have a chance to be redelivered after the broker crashes. With the transaction, the client can only get the receipt after the acknowledgments have been persistent.false

    You can configure parameters if you do not want to use the default configuration. For a full list, see the Javadoc for the ConsumerBuilder class.

    The following is an example.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .ackTimeout(10, TimeUnit.SECONDS)
    5. .subscriptionType(SubscriptionType.Exclusive)
    6. .subscribe();

    Async receive

    The receive method receives messages synchronously (the consumer process is blocked until a message is available). You can also use async receive, which returns a CompletableFuture object immediately once a new message is available.

    The following is an example.

    1. CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

    Async receive operations return a Message wrapped inside of a CompletableFuture.

    Batch receive

    Use batchReceive to receive multiple messages for each call.

    The following is an example.

    1. Messages messages = consumer.batchReceive();
    2. for (Object message : messages) {
    3. // do something
    4. }
    5. consumer.acknowledge(messages)

    Java - 图5note

    Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages. The batch receive is completed if any of the following conditions are met: enough number of messages, bytes of messages, wait timeout.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .batchReceivePolicy(BatchReceivePolicy.builder()
    5. .maxNumMessages(100)
    6. .maxNumBytes(1024 * 1024)
    7. .timeout(200, TimeUnit.MILLISECONDS)
    8. .build())
    9. .subscribe();

    The default batch receive policy is:

    1. BatchReceivePolicy.builder()
    2. .maxNumMessage(-1)
    3. .maxNumBytes(10 * 1024 * 1024)
    4. .timeout(100, TimeUnit.MILLISECONDS)
    5. .build();

    Configure chunking

    You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring the maxPendingChunkedMessage and autoAckOldestChunkedMessageOnQueueFull parameters. When the threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later. The expireTimeOfIncompleteChunkedMessage parameter decides the time interval to expire incomplete chunks if the consumer fails to receive all chunks of a message within the specified time period.

    The following is an example of how to configure message chunking.

    1. Consumer<byte[]> consumer = client.newConsumer()
    2. .topic(topic)
    3. .subscriptionName("test")
    4. .autoAckOldestChunkedMessageOnQueueFull(true)
    5. .maxPendingChunkedMessage(100)
    6. .expireTimeOfIncompleteChunkedMessage(10, TimeUnit.MINUTES)
    7. .subscribe();

    Negative acknowledgment redelivery backoff

    The RedeliveryBackoff introduces a redelivery backoff mechanism. You can achieve redelivery with different delays by setting redeliveryCount of messages.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
    5. .minDelayMs(1000)
    6. .maxDelayMs(60 * 1000)
    7. .build())
    8. .subscribe();

    Acknowledgment timeout redelivery backoff

    The RedeliveryBackoff introduces a redelivery backoff mechanism. You can redeliver messages with different delays by setting the number of times the messages are retried.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .ackTimeout(10, TimeUnit.SECOND)
    5. .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
    6. .minDelayMs(1000)
    7. .maxDelayMs(60000)
    8. .multiplier(2)
    9. .build())
    10. .subscribe();

    The message redelivery behavior should be as follows.

    Redelivery countRedelivery delay
    110 + 1 seconds
    210 + 2 seconds
    310 + 4 seconds
    410 + 8 seconds
    510 + 16 seconds
    610 + 32 seconds
    710 + 60 seconds
    810 + 60 seconds

    Java - 图6note

    • The negativeAckRedeliveryBackoff does not work with consumer.negativeAcknowledge(MessageId messageId) because you are not able to get the redelivery count from the message ID.
    • If a consumer crashes, it triggers the redelivery of unacked messages. In this case, RedeliveryBackoff does not take effect and the messages might get redelivered earlier than the delay time from the backoff.

    Multi-topic subscriptions

    In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using multi-topic subscriptions. To use multi-topic subscriptions you can supply either a regular expression (regex) or a List of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.

    The followings are some examples.

    1. import org.apache.pulsar.client.api.Consumer;
    2. import org.apache.pulsar.client.api.PulsarClient;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. import java.util.regex.Pattern;
    6. ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
    7. .subscriptionName(subscription);
    8. // Subscribe to all topics in a namespace
    9. Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
    10. Consumer allTopicsConsumer = consumerBuilder
    11. .topicsPattern(allTopicsInNamespace)
    12. .subscribe();
    13. // Subscribe to a subsets of topics in a namespace, based on regex
    14. Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
    15. Consumer allTopicsConsumer = consumerBuilder
    16. .topicsPattern(someTopicsInNamespace)
    17. .subscribe();

    In the above example, the consumer subscribes to the persistent topics that can match the topic name pattern. If you want the consumer subscribes to all persistent and non-persistent topics that can match the topic name pattern, set subscriptionTopicsMode to RegexSubscriptionMode.AllTopics.

    1. Pattern pattern = Pattern.compile("public/default/.*");
    2. pulsarClient.newConsumer()
    3. .subscriptionName("my-sub")
    4. .topicsPattern(pattern)
    5. .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
    6. .subscribe();

    Java - 图7note

    By default, the subscriptionTopicsMode of the consumer is PersistentOnly. Available options of subscriptionTopicsMode are PersistentOnly, NonPersistentOnly, and AllTopics.

    You can also subscribe to an explicit list of topics (across namespaces if you wish):

    1. List<String> topics = Arrays.asList(
    2. "topic-1",
    3. "topic-2",
    4. "topic-3"
    5. );
    6. Consumer multiTopicConsumer = consumerBuilder
    7. .topics(topics)
    8. .subscribe();
    9. // Alternatively:
    10. Consumer multiTopicConsumer = consumerBuilder
    11. .topic(
    12. "topic-1",
    13. "topic-2",
    14. "topic-3"
    15. )
    16. .subscribe();

    You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. The following is an example.

    1. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
    2. consumerBuilder
    3. .topics(topics)
    4. .subscribeAsync()
    5. .thenAccept(this::receiveMessageFromConsumer);
    6. private void receiveMessageFromConsumer(Object consumer) {
    7. ((Consumer)consumer).receiveAsync().thenAccept(message -> {
    8. // Do something with the received message
    9. receiveMessageFromConsumer(consumer);
    10. });
    11. }

    Subscription types

    Pulsar has various subscription types to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.

    A subscription is identical to the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.

    Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.

    To better describe their differences, assume you have a topic named “my-topic”, and the producer has published 10 messages.

    1. Producer<String> producer = client.newProducer(Schema.STRING)
    2. .topic("my-topic")
    3. .enableBatching(false)
    4. .create();
    5. // 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
    6. producer.newMessage().key("key-1").value("message-1-1").send();
    7. producer.newMessage().key("key-1").value("message-1-2").send();
    8. producer.newMessage().key("key-1").value("message-1-3").send();
    9. producer.newMessage().key("key-2").value("message-2-1").send();
    10. producer.newMessage().key("key-2").value("message-2-2").send();
    11. producer.newMessage().key("key-2").value("message-2-3").send();
    12. producer.newMessage().key("key-3").value("message-3-1").send();
    13. producer.newMessage().key("key-3").value("message-3-2").send();
    14. producer.newMessage().key("key-4").value("message-4-1").send();
    15. producer.newMessage().key("key-4").value("message-4-2").send();

    Exclusive

    Create a new consumer and subscribe with the Exclusive subscription type.

    1. Consumer consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscriptionType(SubscriptionType.Exclusive)
    5. .subscribe()

    Only the first consumer is allowed to the subscription, other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.

    Java - 图8note

    If topic is a partitioned topic, the first consumer subscribes to all partitioned topics, other consumers are not assigned with partitions and receive an error.

    Failover

    Create new consumers and subscribe with theFailover subscription type.

    1. Consumer consumer1 = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscriptionType(SubscriptionType.Failover)
    5. .subscribe()
    6. Consumer consumer2 = client.newConsumer()
    7. .topic("my-topic")
    8. .subscriptionName("my-subscription")
    9. .subscriptionType(SubscriptionType.Failover)
    10. .subscribe()
    11. //conumser1 is the active consumer, consumer2 is the standby consumer.
    12. //consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

    Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes the active consumer.

    If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:

    1. ("key-1", "message-1-1")
    2. ("key-1", "message-1-2")
    3. ("key-1", "message-1-3")
    4. ("key-2", "message-2-1")
    5. ("key-2", "message-2-2")

    consumer2 will receive:

    1. ("key-2", "message-2-3")
    2. ("key-3", "message-3-1")
    3. ("key-3", "message-3-2")
    4. ("key-4", "message-4-1")
    5. ("key-4", "message-4-2")

    Java - 图9note

    If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.

    Shared

    Create new consumers and subscribe with Shared subscription type.

    1. Consumer consumer1 = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscriptionType(SubscriptionType.Shared)
    5. .subscribe()
    6. Consumer consumer2 = client.newConsumer()
    7. .topic("my-topic")
    8. .subscriptionName("my-subscription")
    9. .subscriptionType(SubscriptionType.Shared)
    10. .subscribe()
    11. //Both consumer1 and consumer2 are active consumers.

    In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.

    If a broker dispatches only one message at a time, consumer1 receives the following information.

    1. ("key-1", "message-1-1")
    2. ("key-1", "message-1-3")
    3. ("key-2", "message-2-2")
    4. ("key-3", "message-3-1")
    5. ("key-4", "message-4-1")

    consumer2 receives the following information.

    1. ("key-1", "message-1-2")
    2. ("key-2", "message-2-1")
    3. ("key-2", "message-2-3")
    4. ("key-3", "message-3-2")
    5. ("key-4", "message-4-2")

    The Shared subscription is different from the Exclusive and Failover subscription types. Shared subscription has better flexibility, but cannot provide an ordering guarantee.

    Key_shared

    This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with Key_Shared subscription type.

    1. Consumer consumer1 = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .subscriptionType(SubscriptionType.Key_Shared)
    5. .subscribe()
    6. Consumer consumer2 = client.newConsumer()
    7. .topic("my-topic")
    8. .subscriptionName("my-subscription")
    9. .subscriptionType(SubscriptionType.Key_Shared)
    10. .subscribe()
    11. //Both consumer1 and consumer2 are active consumers.

    Just like in the Shared subscription, all consumers in the Key_Shared subscription type can attach to the same subscription. But the Key_Shared subscription type is different from the Shared subscription. In the Key_Shared subscription type, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).

    consumer1 receives the following information.

    1. ("key-1", "message-1-1")
    2. ("key-1", "message-1-2")
    3. ("key-1", "message-1-3")
    4. ("key-3", "message-3-1")
    5. ("key-3", "message-3-2")

    consumer2 receives the following information.

    1. ("key-2", "message-2-1")
    2. ("key-2", "message-2-2")
    3. ("key-2", "message-2-3")
    4. ("key-4", "message-4-1")
    5. ("key-4", "message-4-2")

    If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the KeyBasedBatcher.

    1. Producer producer = client.newProducer()
    2. .topic("my-topic")
    3. .batcherBuilder(BatcherBuilder.KEY_BASED)
    4. .create();

    Or the producer can disable batching.

    1. Producer producer = client.newProducer()
    2. .topic("my-topic")
    3. .enableBatching(false)
    4. .create();

    Java - 图10note

    If the message key is not specified, messages without keys are dispatched to one consumer in order by default.

    Intercept messages

    ConsumerInterceptors intercept and possibly mutate messages received by the consumer.

    The interface has six main events:

    • beforeConsume is triggered before the message is returned by receive() or receiveAsync(). You can modify messages within this event.
    • onAcknowledge is triggered before the consumer sends the acknowledgement to the broker.
    • onAcknowledgeCumulative is triggered before the consumer sends the cumulative acknowledgement to the broker.
    • onNegativeAcksSend is triggered when a redelivery from a negative acknowledgement occurs.
    • onAckTimeoutSend is triggered when a redelivery from an acknowledgement timeout occurs.
    • onPartitionsChange is triggered when the partitions of the (partitioned) topic change.

    To intercept messages, you can add one or multiple ConsumerInterceptors when creating a Consumer as follows.

    1. Consumer<String> consumer = client.newConsumer()
    2. .topic("my-topic")
    3. .subscriptionName("my-subscription")
    4. .intercept(new ConsumerInterceptor<String> {
    5. @Override
    6. public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
    7. // user-defined processing logic
    8. }
    9. @Override
    10. public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
    11. // user-defined processing logic
    12. }
    13. @Override
    14. public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
    15. // user-defined processing logic
    16. }
    17. @Override
    18. public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
    19. // user-defined processing logic
    20. }
    21. @Override
    22. public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
    23. // user-defined processing logic
    24. }
    25. @Override
    26. public void onPartitionsChange(String topicName, int partitions) {
    27. // user-defined processing logic
    28. }
    29. })
    30. .subscribe();

    Java - 图11note

    If you are using multiple interceptors, they apply in the order they are passed to the intercept method.

    Reader

    With the reader interface, Pulsar clients can “manually position” themselves within a topic and read all messages from a specified message onward. The Pulsar API for Java enables you to create Reader objects by specifying a topic and a MessageId.

    The following is an example.

    1. byte[] msgIdBytes = // Some message ID byte array
    2. MessageId id = MessageId.fromByteArray(msgIdBytes);
    3. Reader reader = pulsarClient.newReader()
    4. .topic(topic)
    5. .startMessageId(id)
    6. .create();
    7. while (true) {
    8. Message message = reader.readNext();
    9. // Process message
    10. }

    In the example above, a Reader object is instantiated for a specific topic and message (by ID); the reader iterates over each message in the topic after the message is identified by msgIdBytes (how that value is obtained depends on the application).

    The code sample above shows pointing the Reader object to a specific message (by ID), but you can also use MessageId.earliest to point to the earliest available message on the topic of MessageId.latest to point to the most recent available message.

    Configure reader

    When you create a reader, you can use the loadConf configuration. The following parameters are available in loadConf.

    NameType
    Description
    Default
    topicNameStringTopic name.None
    receiverQueueSizeintSize of a consumer’s receiver queue.

    For example, the number of messages that can be accumulated by a consumer before an application calls Receive.

    A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
    1000
    readerListenerReaderListener<T>A listener that is called for message received.None
    readerNameStringReader name.null
    subscriptionNameStringSubscription nameWhen there is a single topic, the default subscription name is “reader-“ + 10-digit UUID.
    When there are multiple topics, the default subscription name is “multiTopicsReader-“ + 10-digit UUID.
    subscriptionRolePrefixStringPrefix of subscription role.null
    cryptoKeyReaderCryptoKeyReaderInterface that abstracts the access to a key store.null
    cryptoFailureActionConsumerCryptoFailureActionConsumer should take action when it receives a message that can not be decrypted.
  • FAIL: this is the default option to fail messages until crypto succeeds.
  • DISCARD: silently acknowledge and not deliver message to an application.
  • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

  • The message decompression fails.

    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

    Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.
  • ConsumerCryptoFailureAction.FAIL
  • readCompactedbooleanIf enabling readCompacted, a consumer reads messages from a compacted topic rather than a full message backlog of a topic.

    A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

    readCompacted can only be enabled on subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions).

    Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
    false
    resetIncludeHeadbooleanIf set to true, the first message to be returned is the one specified by messageId.

    If set to false, the first message to be returned is the one next to the message specified by messageId.
    false

    Sticky key range reader

    In a sticky key range reader, broker only dispatches messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

    The following is an example to create a sticky key range reader.

    1. pulsarClient.newReader()
    2. .topic(topic)
    3. .startMessageId(MessageId.earliest)
    4. .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
    5. .create();

    The total hash range size is 65536, so the max end of the range should be less than or equal to 65535.

    Configure chunking

    Configuring chunking for readers is similar to that for consumers. See configure chunking for consumers for more information.

    The following is an example of how to configure message chunking for a reader.

    1. Reader<byte[]> reader = pulsarClient.newReader()
    2. .topic(topicName)
    3. .startMessageId(MessageId.earliest)
    4. .maxPendingChunkedMessage(12)
    5. .autoAckOldestChunkedMessageOnQueueFull(true)
    6. .expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS)
    7. .create();

    Create reader with interceptor

    Pulsar reader interceptor intercepts and possibly mutates messages with user-defined processing before Pulsar reader reads them. With reader interceptors, you can apply unified messaging processes before messages can be read, such as modifying messages, adding properties, collecting statistics and etc, without creating similar mechanisms respectively.

    Reader interceptor

    Pulsar reader interceptor works on top of Pulsar consumer interceptor. The plugin interface ReaderInterceptor can be treated as a subset of ConsumerInterceptor and it has two main events.

    • beforeRead is triggered before readers read messages. You can modify messages within this event.
    • onPartitionsChange is triggered when changes on partitions have been detected.

    To perceive triggered events and perform customized processing, you can add ReaderInterceptor when creating a Reader as follows.

    1. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
    2. Reader<byte[]> reader = pulsarClient.newReader()
    3. .topic("t1")
    4. .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
    5. .intercept(new ReaderInterceptor<byte[]>() {
    6. @Override
    7. public void close() {
    8. }
    9. @Override
    10. public Message<byte[]> beforeRead(Reader<byte[]> reader, Message<byte[]> message) {
    11. // user-defined processing logic
    12. return message;
    13. }
    14. @Override
    15. public void onPartitionsChange(String topicName, int partitions) {
    16. // user-defined processing logic
    17. }
    18. })
    19. .startMessageId(MessageId.earliest)
    20. .create();

    TableView

    The TableView interface serves an encapsulated access pattern, providing a continuously updated key-value map view of the compacted topic data. Messages without keys will be ignored.

    With TableView, Pulsar clients can fetch all the message updates from a topic and construct a map with the latest values of each key. These values can then be used to build a local cache of data. In addition, you can register consumers with the TableView by specifying a listener to perform a scan of the map and then receive notifications when new messages are received. Consequently, event handling can be triggered to serve use cases, such as event-driven applications and message monitoring.

    Java - 图13note

    Each TableView uses one Reader instance per partition, and reads the topic starting from the compacted view by default. It is highly recommended to enable automatic compaction by configuring the topic compaction policies for the given topic or namespace. More frequent compaction results in shorter startup times because less data is replayed to reconstruct the TableView of the topic. Starting from Pulsar 2.11.0, TableView also supports reading non-persistent topics, but it does not guarantee data consistency.

    The following figure illustrates the dynamic construction of a TableView updated with newer values of each key. TableView

    Configure TableView

    The following is an example of how to configure a TableView.

    1. TableView<String> tv = client.newTableViewBuilder(Schema.STRING)
    2. .topic("my-tableview")
    3. .create()

    You can use the available parameters in the loadConf configuration or related API to customize your TableView.

    NameTypeRequired?
    Description
    Default
    topicstringyesThe topic name of the TableView.N/A
    autoUpdatePartitionIntervalintnoThe interval to check for newly added partitions.60 (seconds)
    subscriptionNamestringnoThe subscription name of the TableView.null

    Register listeners

    You can register listeners for both existing messages on a topic and new messages coming into the topic by using forEachAndListen, and specify to perform operations for all existing messages by using forEach.

    The following is an example of how to register listeners with TableView.

    1. // Register listeners for all existing and incoming messages
    2. tv.forEachAndListen((key, value) -> /*operations on all existing and incoming messages*/)
    3. // Register action for all existing messages
    4. tv.forEach((key, value) -> /*operations on all existing messages*/)

    Schema

    In Pulsar, all message data consists of byte arrays “under the hood.” Message schemas enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a producer without specifying a schema, then the producer can only produce messages of type byte[]. The following is an example.

    1. Producer<byte[]> producer = client.newProducer()
    2. .topic(topic)
    3. .create();

    The producer above is equivalent to a Producer<byte[]> (in fact, you should always explicitly specify the type). If you’d like to use a producer for a different type of data, you need to specify a schema that informs Pulsar which data type will be transmitted over the topic. For more examples, see Schema - Get started.

    Authentication

    Pulsar Java clients currently support the following authentication mechansims:

    Cluster-level failover

    For more concepts and reference information about cluster-level failover, including concepts, benefits, use cases, constraints, usage and working principles, see Cluster-level failover.

    Java - 图15tip

    • You should configure cluster-level failover only when the cluster contains sufficient resources to handle all possible consequences. Workload intensity on the backup cluster may increase significantly.

    • Connect clusters to an uninterruptible power supply (UPS) unit to reduce the risk of unexpected power loss.

    Requirements

    • Pulsar client 2.10 or later versions.

    • For backup clusters:

      • The number of BookKeeper nodes should be equal to or greater than the ensemble quorum.

      • The number of ZooKeeper nodes should be equal to or greater than 3.

    • Turn on geo-replication between the primary cluster and any dependent cluster (primary to backup or backup to backup) to prevent data loss.

    • Enable replicated subscription.

    • Automatic cluster-level failover

    • Controlled cluster-level failover

    This is an example of how to construct a Java Pulsar client to use automatic cluster-level failover. The switchover is triggered automatically.

    1. private PulsarClient getAutoFailoverClient() throws PulsarClientException {
    2. String primaryUrl = "pulsar+ssl://localhost:6651";
    3. String secondaryUrl = "pulsar+ssl://localhost:6661";
    4. String primaryTlsTrustCertsFilePath = "primary/path";
    5. Authentication primaryAuthentication = AuthenticationFactory.create(
    6. "org.apache.pulsar.client.impl.auth.AuthenticationTls",
    7. "tlsCertFile:/path/to/primary-my-role.cert.pem,"
    8. + "tlsKeyFile:/path/to/primary-role.key-pk8.pem");
    9. String secondaryTlsTrustCertsFilePath = "secondary/path";
    10. Authentication secondaryAuthentication = AuthenticationFactory.create(
    11. "org.apache.pulsar.client.impl.auth.AuthenticationTls",
    12. "tlsCertFile:/path/to/secondary-my-role.cert.pem,"
    13. + "tlsKeyFile:/path/to/secondary-role.key-pk8.pem");
    14. // You can put more failover cluster config in to map
    15. Map<String, String> secondaryTlsTrustCertsFilePaths = new HashMap<>();
    16. secondaryTlsTrustCertsFilePaths.put(secondaryUrl, secondaryTlsTrustCertsFilePath);
    17. Map<String, Authentication> secondaryAuthentications = new HashMap<>();
    18. secondaryAuthentications.put(secondaryUrl, secondaryAuthentication);
    19. ServiceUrlProvider failover = AutoClusterFailover.builder()
    20. .primary(primaryUrl)
    21. .secondary(List.of(secondaryUrl))
    22. .failoverDelay(30, TimeUnit.SECONDS)
    23. .switchBackDelay(60, TimeUnit.SECONDS)
    24. .checkInterval(1000, TimeUnit.MILLISECONDS)
    25. .secondaryTlsTrustCertsFilePath(secondaryTlsTrustCertsFilePaths)
    26. .secondaryAuthentication(secondaryAuthentications)
    27. .build();
    28. PulsarClient pulsarClient = PulsarClient.builder()
    29. .authentication(primaryAuthentication)
    30. .tlsTrustCertsFilePath(primaryTlsTrustCertsFilePath)
    31. .build();
    32. failover.initialize(pulsarClient);
    33. return pulsarClient;
    34. }

    Configure the following parameters:

    ParameterDefault valueRequired?Description
    primaryN/AYesService URL of the primary cluster.
    secondaryN/AYesService URL(s) of one or several backup clusters.

    You can specify several backup clusters using a comma-separated list.

    Note that:
    - The backup cluster is chosen in the sequence shown in the list.
    - If all backup clusters are available, the Pulsar client chooses the first backup cluster.
    failoverDelayN/AYesThe delay before the Pulsar client switches from the primary cluster to the backup cluster.

    Automatic failover is controlled by a probe task:
    1) The probe task first checks the health status of the primary cluster.
    2) If the probe task finds the continuous failure time of the primary cluster exceeds failoverDelayMs, it switches the Pulsar client to the backup cluster.
    switchBackDelayN/AYesThe delay before the Pulsar client switches from the backup cluster to the primary cluster.

    Automatic failover switchover is controlled by a probe task:
    1) After the Pulsar client switches from the primary cluster to the backup cluster, the probe task continues to check the status of the primary cluster.
    2) If the primary cluster functions well and continuously remains active longer than switchBackDelay, the Pulsar client switches back to the primary cluster.
    checkInterval30sNoFrequency of performing a probe task (in seconds).
    secondaryTlsTrustCertsFilePathN/ANoA map of certificate file. Keys are service urls of backup cluster. Values are paths to the trusted TLS certificate file of the backup cluster.
    secondaryAuthenticationN/ANoA map of Authentication config. Keys are service urls of backup cluster. Values are Authentication object of the backup cluster.

    This is an example of how to construct a Java Pulsar client to use controlled cluster-level failover. The switchover is triggered by administrators manually.

    Note: you can have one or several backup clusters but can only specify one.

    1. public PulsarClient getControlledFailoverClient() throws IOException {
    2. Map<String, String> header = new HashMap();
    3. header.put("service_user_id", "my-user");
    4. header.put("service_password", "tiger");
    5. header.put("clusterA", "tokenA");
    6. header.put("clusterB", "tokenB");
    7. ServiceUrlProvider provider = ControlledClusterFailover.builder()
    8. .defaultServiceUrl("pulsar://localhost:6650")
    9. .checkInterval(1, TimeUnit.MINUTES)
    10. .urlProvider("http://localhost:8080/test")
    11. .urlProviderHeader(header)
    12. .build();
    13. PulsarClient pulsarClient = PulsarClient.builder().build();
    14. provider.initialize(pulsarClient);
    15. return pulsarClient;
    16. }
    ParameterDefault valueRequired?Description
    defaultServiceUrlN/AYesPulsar service URL.
    checkInterval30sNoFrequency of performing a probe task (in seconds).
    urlProviderN/AYesURL provider service.
    urlProviderHeaderN/ANourlProviderHeader is a map containing tokens and credentials.

    If you enable authentication or authorization between Pulsar clients and primary and backup clusters, you need to provide urlProviderHeader.

    Here is an example of how urlProviderHeader works.

    How urlProviderHeader works

    Assume that you want to connect Pulsar client 1 to cluster A.

    1. Pulsar client 1 sends the token t1 to the URL provider service.

    2. The URL provider service returns the credential c1 and the cluster A URL to the Pulsar client.

      The URL provider service manages all tokens and credentials. It returns different credentials based on different tokens and different target cluster URLs to different Pulsar clients.

      Note: the credential must be in a JSON file and contain parameters as shown.

      1. {
      2. "serviceUrl": "pulsar+ssl://target:6651",
      3. "tlsTrustCertsFilePath": "/security/ca.cert.pem",
      4. "authPluginClassName":"org.apache.pulsar.client.impl.auth.AuthenticationTls",
      5. "authParamsString": " \"tlsCertFile\": \"/security/client.cert.pem\"
      6. \"tlsKeyFile\": \"/security/client-pk8.pem\" "
      7. }
    3. Pulsar client 1 connects to cluster A using credential c1.