Messaging Concepts

Pulsar is built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to topics. Consumers can then subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete.

Once a subscription has been created, all messages will be retained by Pulsar, even if the consumer gets disconnected. Retained messages will be discarded only when a consumer acknowledges that they've been successfully processed.

Messages

Messages are the basic "unit" of Pulsar. They're what producers publish to topics and what consumers then consume from topics (and acknowledge when the message has been processed). Messages are the analogue of letters in a postal service system.

ComponentPurpose
Value / data payloadThe data carried by the message. All Pulsar messages carry raw bytes, although message data can also conform to data schemas
KeyMessages can optionally be tagged with keys, which can be useful for things like topic compaction
PropertiesAn optional key/value map of user-defined properties
Producer nameThe name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well)
Sequence IDEach Pulsar message belongs to an ordered sequence on its topic. A message's sequence ID is its ordering in that sequence.
Publish timeThe timestamp of when the message was published (automatically applied by the producer)
Event timeAn optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set.

For a more in-depth breakdown of Pulsar message contents, see the documentation on Pulsar's binary protocol.

Producers

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing.

Send modes

Producers can send messages to brokers either synchronously (sync) or asynchronously (async).

ModeDescription
Sync sendThe producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn't received then the producer will consider the send operation a failure.
Async sendThe producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable, the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.

Compression

Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports the following types of compression:

Batching

If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.

Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Receive modes

Messages can be received from brokers either synchronously (sync) or asynchronously (async).

ModeDescription
Sync receiveA sync receive will be blocked until a message is available.
Async receiveAn async receive will return immediately with a future value—-a CompletableFuture in Java, for example—-that completes once a new message is available.

Listeners

Client libraries provide listener implementation for consumers. For example, the Java client provides a MesssageListener interface. In this interface, the received method is called whenever a new message is received.

Acknowledgement

When a consumer has consumed a message successfully, the consumer sends an acknowledgement request to the broker, so that the broker will discard the message. Otherwise, it stores the message.

Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.

Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.

In the shared subscription mode, messages can be acknowledged individually.

Negative acknowledgement

When a consumer does not consume a message successfully at a time, and wants to consume the message again, the consumer can send a negative acknowledgement to the broker, and then the broker will redeliver the message.

Messages can be negatively acknowledged one by one or cumulatively, which depends on the consumption subscription mode.

In the exclusive and failover subscription modes, consumers only negatively acknowledge the last message they have received.

In the shared and Key_Shared subscription modes, you can negatively acknowledge messages individually.

Acknowledgement timeout

When a message is not consumed successfully, and you want to trigger the broker to redeliver the message automatically, you can adopt the unacknowledged message automatic re-delivery mechanism. Client will track the unacknowledged messages within the entire acktimeout time range, and send a redeliver unacknowledged messages request to the broker automatically when the acknowledgement timeout is specified.

NoteUse negative acknowledgement prior to acknowledgement timeout. Negative acknowledgement controls re-delivery of individual messages with more precise, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.

Dead letter topic

Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.

The following example shows how to enable dead letter topic in Java client.

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .build())
  8. .subscribe();

Dead letter topic depends on message re-delivery. You need to confirm message re-delivery method: negative acknowledgement or acknowledgement timeout. Use negative acknowledgement prior to acknowledgement timeout.

NoteCurrently, dead letter topic is enabled only in the shared subscription mode.

Topics

As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic names are URLs that have a well-defined structure:

  1. {persistent|non-persistent}://tenant/namespace/topic
Topic name componentDescription
persistent / non-persistentThis identifies the type of topic. Pulsar supports two kind of topics: persistent and non-persistent (persistent is the default, so if you don't specify a type the topic will be persistent). With persistent topics, all messages are durably persisted on disk (that means on multiple disks unless the broker is standalone), whereas data for non-persistent topics isn't persisted to storage disks.
tenantThe topic's tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.
namespaceThe administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the namespace level. Each tenant can have multiple namespaces.
topicThe final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.

No need to explicitly create new topics

You don't need to explicitly create topics in Pulsar. If a client attempts to write or receive messages to/from a topic that does not yet exist, Pulsar will automatically create that topic under the namespace provided in the topic name.

Namespaces

A namespace is a logical nomenclature within a tenant. A tenant can create multiple namespaces via the admin API. For instance, a tenant with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. The topic my-tenant/app1 is a namespace for the application app1 for my-tenant. You can create any number of topics under the namespace.

Subscription modes

A subscription is a named configuration rule that determines how messages are delivered to consumers. There are three available subscription modes in Pulsar: exclusive, shared, and failover. These modes are illustrated in the figure below.

Subscription modes

Exclusive

In exclusive mode, only a single consumer is allowed to attach to the subscription. If more than one consumer attempts to subscribe to a topic using the same subscription, the consumer receives an error.

In the diagram above, only Consumer-A is allowed to consume messages.

Exclusive mode is the default subscription mode.

Exclusive subscriptions

Failover

In failover mode, multiple consumers can attach to the same subscription. The consumers will be lexically sorted by the consumer's name and the first consumer will initially be the only one receiving messages. This consumer is called the master consumer.

When the master consumer disconnects, all (non-acked and subsequent) messages will be delivered to the next consumer in line.

In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 would be the next in line to receive messages if Consumer-C-1 disconnected.

Failover subscriptions

Shared

In shared or round robin mode, multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.

In the diagram above, Consumer-B-1 and Consumer-B-2 are able to subscribe to the topic, but Consumer-C-1 and others could as well.

Limitations of shared mode

There are two important things to be aware of when using shared mode:

  • Message ordering is not guaranteed.
  • You cannot use cumulative acknowledgment with shared mode.

Shared subscriptions

Key_shared

In Key_Shared mode, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.

Limitations of Key_Shared mode

There are two important things to be aware of when using Key_Shared mode:

  • You need to specify a key or orderingKey for messages
  • You cannot use cumulative acknowledgment with Key_Shared mode.

Key_Shared subscriptions

Key_Shared subscription is a beta feature. You can disable it at broker.config.

Multi-topic subscriptions

When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as persistent://public/default/my-topic. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways:

  • On the basis of a regular expression (regex), for example persistent://public/default/finance-.*
  • By explicitly defining a list of topics

When subscribing to multiple topics by regex, all topics must be in the same namespace

When subscribing to multiple topics, the Pulsar client will automatically make a call to the Pulsar API to discover the topics that match the regex pattern/list and then subscribe to all of them. If any of the topics don't currently exist, the consumer will auto-subscribe to them once the topics are created.

No ordering guarantees

When a consumer subscribes to multiple topics, all ordering guarantees normally provided by Pulsar on single topics do not hold. If your use case for Pulsar involves any strict ordering requirements, we would strongly recommend against using this feature.

Here are some multi-topic subscription examples for Java:

  1. import java.util.regex.Pattern;
  2. import org.apache.pulsar.client.api.Consumer;
  3. import org.apache.pulsar.client.api.PulsarClient;
  4. PulsarClient pulsarClient = // Instantiate Pulsar client object
  5. // Subscribe to all topics in a namespace
  6. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
  7. Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
  8. .topicsPattern(allTopicsInNamespace)
  9. .subscriptionName("subscription-1")
  10. .subscribe();
  11. // Subscribe to a subsets of topics in a namespace, based on regex
  12. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
  13. Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
  14. .topicsPattern(someTopicsInNamespace)
  15. .subscriptionName("subscription-1")
  16. .subscribe();

For code examples, see:

Partitioned topics

Normal topics can be served only by a single broker, which limits the topic's maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.

Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.

The diagram below illustrates this:

Messaging - 图6

Here, the topic Topic1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).

Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.

Decisions about routing and subscription modes can be made separately in most cases. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.

There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.

Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.

Routing modes

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—-that is, which internal topic—-each message should be published to.

There are three MessageRoutingMode available:

ModeDescription
RoundRobinPartitionIf no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition. This is the default mode.
SinglePartitionIf no key is provided, the producer will randomly pick one single partition and publish all the messages into that partition. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition.
CustomPartitionUse custom message router implementation that will be called to determine the partition for a particular message. User can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

Ordering guarantee

The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.

If there is a key attached to message, the messages will be routed to corresponding partitions based on the hashing scheme specified by HashingScheme in ProducerBuilder, when using either SinglePartition or RoundRobinPartition mode.

Ordering guaranteeDescriptionRouting Mode and Key
Per-key-partitionAll the messages with the same key will be in order and be placed in same partition.Use either SinglePartition or RoundRobinPartition mode, and Key is provided by each message.
Per-producerAll the messages from the same producer will be in order.Use SinglePartition mode, and no Key is provided for each message.

Hashing scheme

HashingScheme is an enum that represent sets of standard hashing functions available when choosing the partition to use for a particular message.

There are 2 types of standard hashing functions available: JavaStringHash and Murmur3_32Hash.The default hashing function for producer is JavaStringHash.Please pay attention that JavaStringHash is not useful when producers can be from different multiple language clients, under this use case, it is recommended to use Murmur3_32Hash.

Non-persistent topics

By default, Pulsar persistently stores all unacknowledged messages on multiple BookKeeper bookies (storage nodes). Data for messages on persistent topics can thus survive broker restarts and subscriber failover.

Pulsar also, however, supports non-persistent topics, which are topics on which messages are never persisted to disk and live only in memory. When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.

Non-persistent topics have names of this form (note the non-persistent in the name):

  1. non-persistent://tenant/namespace/topic

For more info on using non-persistent topics, see the Non-persistent messaging cookbook.

In non-persistent topics, brokers immediately deliver messages to all connected subscribers without persisting them in BookKeeper. If a subscriber is disconnected, the broker will not be able to deliver those in-transit messages, and subscribers will never be able to receive those messages again. Eliminating the persistent storage step makes messaging on non-persistent topics slightly faster than on persistent topics in some cases, but with the caveat that some of the core benefits of Pulsar are lost.

With non-persistent topics, message data lives only in memory. If a message broker fails or message data can otherwise not be retrieved from memory, your message data may be lost. Use non-persistent topics only if you're certain that your use case requires it and can sustain it.

By default, non-persistent topics are enabled on Pulsar brokers. You can disable them in the broker's configuration. You can manage non-persistent topics using the pulsar-admin topics interface.

Performance

Non-persistent messaging is usually faster than persistent messaging because brokers don't persist messages and immediately send acks back to the producer as soon as that message is deliver to all connected subscribers. Producers thus see comparatively low publish latency with non-persistent topic.

Client API

Producers and consumers can connect to non-persistent topics in the same way as persistent topics, with the crucial difference that the topic name must start with non-persistent. All three subscription modes—-exclusive, shared, and failover—-are supported for non-persistent topics.

Here's an example Java consumer for a non-persistent topic:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. String npTopic = "non-persistent://public/default/my-topic";
  5. String subscriptionName = "my-subscription-name";
  6. Consumer<byte[]> consumer = client.newConsumer()
  7. .topic(npTopic)
  8. .subscriptionName(subscriptionName)
  9. .subscribe();

Here's an example Java producer for the same non-persistent topic:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(npTopic)
  3. .create();

Message retention and expiry

By default, Pulsar message brokers:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • persistently store all unacknowledged messages in a message backlog.Pulsar has two features, however, that enable you to override this default behavior:

  • Message retention enables you to store messages that have been acknowledged by a consumer

  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry cookbook.

The diagram below illustrates both concepts:

Message retention and expiry

With message retention, shown at the top, a retention policy applied to all topics in a namespace dicates that some messages are durably stored in Pulsar even though they've already been acknowledged. Acknowledged messages that are not covered by the retention policy are deleted. Without a retention policy, all of the acknowledged messages would be deleted.

With message expiry, shown at the bottom, some messages are deleted, even though they haven't been acknowledged, because they've expired according to the TTL applied to the namespace (for example because a TTL of 5 minutes has been applied and the messages haven't been acknowledged but are 10 minutes old).

Message deduplication

Message duplication occurs when a message is persisted by Pulsar more than once. Message _de_duplication is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, even if the message is received more than once.

The following diagram illustrates what happens when message deduplication is disabled vs. enabled:

Pulsar message deduplication

Message deduplication is disabled in the scenario shown at the top. Here, a producer publishes message 1 on a topic; the message reaches a Pulsar broker and is persisted to BookKeeper. The producer then sends message 1 again (in this case due to some retry logic), and the message is received by the broker and stored in BookKeeper again, which means that duplication has occurred.

In the second scenario at the bottom, the producer publishes message 1, which is received by the broker and persisted, as in the first scenario. When the producer attempts to publish the message again, however, the broker knows that it has already seen message 1 and thus does not persist the message.

Message deduplication is handled at the namespace level. For more instructions, see the message deduplication cookbook.

Producer idempotency

The other available approach to message deduplication is to ensure that each message is only produced once. This approach is typically called producer idempotency. The drawback of this approach is that it defers the work of message deduplication to the application. In Pulsar, this is handled at the broker level, which means that you don't need to modify your Pulsar client code. Instead, you only need to make administrative changes (see the Managing message deduplication cookbook for a guide).

Deduplication and effectively-once semantics

Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide effectively-once processing semantics. Messaging systems that don't offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplication. With Pulsar, strict ordering guarantees come at no application-level cost.

More in-depth information can be found in this post on the Streamlio blog