Messaging

Pulsar 采用 发布-订阅的设计模式(简称 pub-sub), 该设计模式中,producer 发布消息到 topicConsumers subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete.

一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。

消息

消息是 Pulsar 的基础“单元”。 The following table lists the components of messages.

组件说明
Value / data payloadThe data carried by the message. All Pulsar messages contain raw bytes, although message data can also conform to data schemas.
Key消息可以选择用键进行标记,这在 topic 压缩 等操作很有用。
Properties用户自定义属性的键值对(可选)。
Producer 名称The name of the producer who produces the message. If you do not specify a producer name, the default name is used.
Sequence IDEach Pulsar message belongs to an ordered sequence on its topic. The sequence ID of the message is its order in that sequence.
Publish timeThe timestamp of when the message is published. The timestamp is automatically applied by the producer.
Event time应用程序可以附加到消息的时间戳(可选), 例如处理消息的时间。 如果没有明确设置,则消息的事件时间为 0
TypedMessageBuilder用于构造消息。 您可以使用 TypedMessageBuilder 设置消息的键值对属性。
在设置 TypedMessageBuilder 时,最佳的选择是将 key 设置为字符串。 如果将 key 设置为其他类型(例如,AVRO 对象),则 key 会以字节形式发送,这时 consumer 就很难使用了。

The default size of a message is 5 MB. You can configure the max size of a message with the following configurations.

  • broker.conf 文件中

    1. # The max size of a message (in bytes).
    2. maxMessageSize=5242880
  • bookkeeper.conf 配置文件中

    1. # The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
    2. nettyMaxFrameSizeBytes=5253120

更多关于 Pulsar 消息的信息,参阅二进制协议

Producers

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker. The Pulsar broker process the messages.

发送模式

Producer 可以以同步(sync) 或 异步(async) 的方式发布消息到 broker。

发送模式说明
Sync sendThe producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure.
异步发送Producer 将把消息放于阻塞队列中,并立即返回 然后,客户端将在后台将消息发送给 broker。 如果队列已满(最大大小可配置),则调用 API 时,producer 可能会立即被阻止或失败,具体取决于传递给 producer 的参数。

访问模式

对于消息生产者来说在主题上你可以有不同类型的访问模式。

访问模式说明

Shared|Multiple producers can publish on a topic.

This is the default setting. Exclusive|仅有一个生产者可以在主题上发布消息。

如果已经有一个生产者连接了此主题,其他试图在此主题上发布消息的生产者会马上得到错误信息。

如果旧的生产者发生了与broker的网络分区,旧的生产者被驱逐后接着会有一个新的生产者被选举出来成为下一个排他性生产者。 WaitForExclusive|如果已经有一个生产者连接了主题,生产者创建过程被挂起(而不是超时) 直到这个生产者获得了 Exclusive 访问权限。

成功成为独家生产者的生产者被视为领导者。 因此,如果你打算为自己的应用实现领导者选举方案,你可以使用这种访问模式。

Note

Once an application creates a producer with the Exclusive or WaitForExclusive access mode successfully, the instance of the application is guaranteed to be the only one writer on the topic. Other producers trying to produce on this topic get errors immediately or have to wait until they get the Exclusive access.

想了解更多信息,请参阅PIP 68:Exclusive Producer

You can set producer access mode through Java Client API. For more information, see ProducerAccessMode in ProducerBuilder.java.

Compression

You can compress messages published by producers during transportation. Pulsar currently supports the following types of compression:

批量处理

当批量处理启用时,producer 会在单个请求中积累并发送一批消息。 批量处理的量大小由最大消息数和最大发布延迟定义。 因此,积压数量是分批处理的总数,而不是信息总数。

在 Pulsar 中,批次被跟踪并存储为单个单元,而不是单个消息。 Consumer 将批量处理的消息拆分成单个消息。 但即使启用了批量处理,也始终将计划中的消息(通过 deliverAt 或者 deliverAfter 进行配置) 作为单个消息发送。

一般来说,当 consumer 确认了一个批的所有消息,该批才会被认定为确认。 这意味着当发生不可预料的失败、否定的确认(negative acknowledgements)或确认超时,都可能导致批中的所有消息都被重新发送,即使其中一些消息已经被确认了。

To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar introduces batch index acknowledgement since Pulsar 2.6.0. When batch index acknowledgement is enabled, the consumer filters out the batch index that has been acknowledged and sends the batch index acknowledgement request to the broker. Broker 维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 consumer 发送已确认的消息。 当某一批消息的所有索引都被确认时,该批消息将被删除。

By default, batch index acknowledgement is disabled (acknowledgmentAtBatchIndexLevelEnabled=false). You can enable batch index acknowledgement by setting the acknowledgmentAtBatchIndexLevelEnabled parameter to true at the broker side. 启用批量索引确认将会导致更多内存开销。

分块

当你想要启用分块(chunking) 时,请阅读以下说明。

  • Batching and chunking cannot be enabled simultaneously. 如果想要启用分块(chunking) ,您必须提前禁用批量处理。
  • Chunking is only supported for persisted topics.
  • Chunking is only supported for the exclusive and failover subscription types.

当启用分块(chunking) 时(chunkingEnabled=true) ,如果消息大小大于允许的最大发布有效载荷大小,则 producer 将原始消息分割成分块的消息,并将它们与块状的元数据一起单独和按顺序发布到 broker。 在 broker 中,分块的消息将和普通的消息以相同的方式存储在 Managed Ledger 上。 唯一的区别是,consumer 需要缓冲分块消息,并在收集完所有分块消息后将其合并成真正的消息。 Managed Ledger 上的分块消息可以和普通消息交织在一起。 如果 producer 未能发布消息的所有分块,则当 consumer 未能在过期时间(expire time) 内接收所有分块时,consumer 可以过期未完成的分块。 By default, the expire time is set to one minute.

Consumer 会缓存收到的块状消息,直到收到消息的所有分块为止。 然后 consumer 将分块的消息拼接在一起,并将它们放入接收器队列中。 客户端从接收器队列中消费消息。 一旦 consumer 使用整个大消息并确认,consumer 就会在内部发送与该大消息关联的所有分块消息的确认。 You can set the maxPendingChunkedMessage parameter on the consumer. 当达到阈值时,consumer 通过静默确认未分块的消息或通过将其标记为未确认,要求 broker 稍后重新发送这些消息。

The broker does not require any changes to support chunking for non-shared subscription. The broker only uses chunkedMessageRate to record chunked message rate on the topic.

处理一个 producer 和一个订阅 consumer 的分块消息

如下图所示,当生产者向主题发送一批大的分块消息和普通的非分块消息时。 假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。 这个 broker 在其管理的ledger里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。 消费者将在内存缓存所有的块消息,直到收到所有的消息块。将这些消息合并成为原始的消息M1,发送给处理进程。

Messaging - 图1

多个生产者和一个生产者处理块消息。

当多个生产者发布块消息到单个主题,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。 如下所示,生产者1发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。 生产者2发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。 这些特定消息的所有分块是顺序排列的,但是其在 ledger 里面可能不是连续的。 这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。

Messaging - 图2

消费者

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 你能够通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

接收模式

可以通过同步(sync) 或者异步(async)的方式从brokers接受消息。

发送模式说明
同步接收同步模式,在收到消息之前都是被阻塞的。
异步接收异步接收模式会立即返回一个 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。

监听

Client libraries provide listener implementation for consumers. For example, the Java client provides a MesssageListener interface. 在这个接口中,一旦接受到新的消息,received方法将被调用。

确认

当消费者成功的消费了一条消息,这个消费者会发送一个确认信息给broker。 这个消息时是永久保存的,只有在收到订阅者消费成功的消息确认后才会被删除。 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。

For a batch message, if batch index acknowledgement is enabled, the broker maintains the batch index acknowledgement status and tracks the acknowledgement status of each batch index to avoid dispatching acknowledged messages to the consumer. 当某一批消息的所有索引都被确认时,该批消息将被删除。 For details about the batch index acknowledgement, see batching.

Messages can be acknowledged in the following two ways:

  • Messages are acknowledged individually. With individual acknowledgement, the consumer needs to acknowledge each message and sends an acknowledgement request to the broker.
  • 累积确认模式 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次发送给那个消费者。

Note Cumulative acknowledgement cannot be used in Shared subscription type, because this subscription type involves multiple consumers which have access to the same subscription. 在共享订阅模式,消息都是单条确认模式。

取消确认

当消费者在某个时间没有成功的消费某条消息,消费者想重新消费到这条消息,这个消费者可以发送一条取消确认消息到 broker,broker 会将这条消息重新发给消费者。

Messages are negatively acknowledged either individually or cumulatively, depending on the consumption subscription type.

In the exclusive and failover subscription types, consumers only negatively acknowledge the last message they receive.

In the shared and Key_Shared subscription types, you can negatively acknowledge messages individually.

Be aware that negative acknowledgment on ordered subscription types, such as Exclusive, Failover and Key_Shared, can cause failed messages to arrive consumers out of the original order.

Note 如果启用批处理,则同一批次中的其他消息和已确认的取消消息将重新传递给消费者。

确认超时

如果消息没有被成功消费,你想去让 broker 自动重新交付这个消息, 你可以采用未确认消息自动重新交付机制。 客户端会跟踪 超时 时间范围内所有未确认的消息。 并且在指定超时时间后会发送一个 重发未确认的消息 请求到 broker。

Note 如果启用批处理,则同一批次中的其他消息和未确认消息将重新传递给消费者。

Note
Prefer negative acknowledgements over acknowledgement timeout. 确认取消是以更高的精度在控制单条消息的重新传递。当消息处理时间超过确认超时时间时,要避免无效的消息重传。

死信主题

Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.

The following example shows how to enable dead letter topic in a Java client using the default dead letter topic:

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .build())
  8. .subscribe();

The default dead letter topic uses this format:

  1. <topicname>-<subscriptionname>-DLQ

If you want to specify the name of the dead letter topic, use this Java client example:

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .deadLetterPolicy(DeadLetterPolicy.builder()
  6. .maxRedeliverCount(maxRedeliveryCount)
  7. .deadLetterTopic("your-topic-name")
  8. .build())
  9. .subscribe();

Dead letter topic depends on message re-delivery. Messages are redelivered either due to acknowledgement timeout or negative acknowledgement. If you are going to use negative acknowledgement on a message, make sure it is negatively acknowledged before the acknowledgement timeout.

Note
Currently, dead letter topic is enabled In the shared and Key_Shared subscription types.

Retry letter topic

很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。

By default, automatic retry is disabled. You can set enableRetry to true to enable automatic retry on the consumer.

如下例子所示,消费者会从重试主题消费消息。

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .enableRetry(true)
  6. .receiverQueueSize(100)
  7. .deadLetterPolicy(DeadLetterPolicy.builder()
  8. .maxRedeliverCount(maxRedeliveryCount)
  9. .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
  10. .build())
  11. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  12. .subscribe();

Topic

As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic的名称为符合良好结构的URL:

  1. {persistent|non-persistent}://tenant/namespace/topic
Topic名称组成说明
持久化 / 非持久化用来标识 topic 的类型。 Pulsar 支持两种主题类型:持久化非持久化。 主题默认是持久化类型,如果不特殊指定主题类型,那主题就是持久化的。 对于持久化的主题,所有的消息都会被持久化的保存到磁盘当中(如果 broker 不是单机模式,消息会被持久化到多块磁盘),而非持久化的主题的数据不会被保存到磁盘里面。
租户The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters.
命名空间将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对命名空间的一项配置。 每个租户里面可以有一个或者多个命名空间。
topicThe final part of the name. Topic names have no special meaning in a Pulsar instance.

No need to explicitly create new topics You do not need to explicitly create topics in Pulsar. 如果客户端尝试从不存在的主题当中生产消息或消费消息,Pulsar 将会自动使用该主题名称在该命名空间下创建同名的主题。 If no tenant or namespace is specified when a client creates a topic, the topic is created in the default tenant and namespace. You can also create a topic in a specified tenant and namespace, such as persistent://my-tenant/my-namespace/my-topic. persistent://my-tenant/my-namespace/my-topic means the my-topic topic is created in the my-namespace namespace of the my-tenant tenant.

命名空间

命名空间是租户内部逻辑上的命名术语。 可以通过admin API在租户下创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic

订阅

订阅是命名好的配置规则,指导消息如何投递给消费者。 Four subscription types are available in Pulsar: exclusive, shared, failover, and key_shared. These types are illustrated in the figure below.

Subscription types

Pub-Sub or Queuing In Pulsar, you can use different subscriptions flexibly. 如果你想在消费者当中使用传统的”发布-订阅消息“,你可以为每个消费者指定一个特定的订阅名称, 这就是独占模式。 如果你想在消费者当中实现”消息队列“的效果,则多个消费者会拥有相同的订阅名称(如共享模式,灾备模式,key共享模式)。 如果你想同时实现两种效果,则可以将订阅模式和其他的订阅模式结合起来使用。

Subscription types

When a subscription has no consumers, its subscription type is undefined. The type of a subscription is defined when a consumer connects to it, and the type can be changed by restarting all consumers with a different configuration.

Exclusive

In exclusive type, only a single consumer is allowed to attach to the subscription. If multiple consumers subscribe to a topic using the same subscription, an error occurs.

In the diagram below, only Consumer A-0 is allowed to consume messages.

Exclusive模式为默认订阅模式。

独占订阅

Failover(灾备)

In Failover type, multiple consumers can attach to the same subscription. 主消费者会消费非分区主题或者分区主题中的每个分区的消息。 When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line.

对于分区主题来说,Broker 将按照消费者的优先级和消费者名称的词汇表顺序对消费者进行排序。 然后试图将主题均匀的分配给优先级最高的消费者。

对于非分区主题来说,broker 会根据消费者订阅非分区主题的顺序选择消费者。

In the diagram below, Consumer-B-0 is the master consumer while Consumer-B-1 would be the next consumer in line to receive messages if Consumer-B-0 is disconnected.

灾备订阅

Shared(共享)

In shared or round robin mode, multiple consumers can attach to the same subscription. 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

In the diagram below, Consumer-C-1 and Consumer-C-2 are able to subscribe to the topic, but Consumer-C-3 and others could as well.

Limitations of Shared type When using Shared type, be aware that: * Message ordering is not guaranteed. * You cannot use cumulative acknowledgment with Shared type.

共享订阅

Key_Shared

In Key_Shared type, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.

Limitations of Key_Shared mode When you use Key_Shared type, be aware that: * You need to specify a key or orderingKey for messages. * 你不能为Key_Shared类型使用累积确认。 * Your producers should disable batching or use a key-based batch builder.

Key_Shared subscriptions

可以在 broker.config 中禁用 Key_Shared 模式。

订阅模式

什么是订阅模式

订阅模式指示游标类型。

  • 创建订阅时,会创建一个关联的游标来记录上次消费的位置。
  • 当订阅的消费者重新启动时,它可以从它消费的最后一条消息继续消费。

订阅模式 | 描述 | 笔记 |—-|—-|—- Durable|游标是持久的,它保留消息并持久化当前位置。

If a broker restarts from a failure, it can recover the cursor from the persistent storage (BookKeeper), so that messages can continue to be consumed from the last consumed position.|Durable is the default subscription mode. NonDurable|游标是非持久的。

Once a broker stops, the cursor is lost and can never be recovered, so that messages can not continue to be consumed from the last consumed position.|Reader’s subscription mode is NonDurable in nature and it does not prevent data in a topic from being deleted. Reader’s subscription mode can not be changed.

订阅可以有一个或多个消费者。 当消费者订阅主题时,必须指定订阅名称。 持久订阅和非持久订阅可以具有相同的名称,它们彼此是独立的。 如果消费者指定了之前不存在的订阅,该订阅会自动创建。

When to use

默认情况下,没有任何持久订阅的主题的消息被标记为已删除。 如果要防止消息被标记为已删除,可以为该主题创建持久订阅。 在这种情况下,只有已确认的消息被标记为已删除。 更多详细信息,请参阅消息保留和过期

如何使用

创建消费者后,消费者的默认订阅模式为 Durable。 你可以通过更改消费者的配置将订阅模式更改为 NonDurable

Durable

Non-durable

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("my-topic") .subscriptionName("my-sub") .subscriptionMode(SubscriptionMode.Durable) .subscribe();
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("my-topic") .subscriptionName("my-sub") .subscriptionMode(SubscriptionMode.NonDurable) .subscribe();

有关如何创建、检查或删除持久订阅,详见订阅管理

多主题订阅

当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:

  • On the basis of a regular expression (regex), for example persistent://public/default/finance-.*
  • 通过明确指定的topic列表

当使用正则匹配订阅多个主题的时候,所有的主题必须是在同一个命名空间里面的。

当订阅多个主题的时候,Pulsar 客户端将自动调用 Pulsar API 找到符合匹配规则的主题列表,然后订阅这些主题。 如果此时有暂不存在的主题,那么一旦这些主题被创建,消费者会自动订阅这些主题。

No ordering guarantees across multiple topics When a producer sends messages to a single topic, all messages are guaranteed to be read from that topic in the same order. However, these guarantees do not hold across multiple topics. So when a producer sends message to multiple topics, the order in which messages are read from those topics is not guaranteed to be the same.

如下是 Java 订阅多个主题的代码示例:

  1. import java.util.regex.Pattern;
  2. import org.apache.pulsar.client.api.Consumer;
  3. import org.apache.pulsar.client.api.PulsarClient;
  4. PulsarClient pulsarClient = // Instantiate Pulsar client object
  5. // Subscribe to all topics in a namespace
  6. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
  7. Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
  8. .topicsPattern(allTopicsInNamespace)
  9. .subscriptionName("subscription-1")
  10. .subscribe();
  11. // Subscribe to a subsets of topics in a namespace, based on regex
  12. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
  13. Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
  14. .topicsPattern(someTopicsInNamespace)
  15. .subscriptionName("subscription-1")
  16. .subscribe();

关于代码示例,请参阅 Java

分区 topic

普通的主题仅仅被保存在单个 broker中,这限制了主题的最大吞吐量。 Partitioned topics are a special type of topic that are handled by multiple brokers, thus allowing for higher throughput.

分区主题实际是通过在底层拥有 N 个内部主题来实现的,这个 N 的数量就是等于分区的数量。 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。

下图对此做了阐明:

Messaging - 图8

The Topic1 topic has five partitions (P0 through P4) split across three brokers. 因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。

这个topic的消息被广播给两个consumer。 The routing mode determines each message should be published to which partition, while the subscription type determines which messages go to which consumers.

Decisions about routing and subscription types can be made separately in most cases. 通常来讲,吞吐能力的要求,决定了 分区/路由 的方式。订阅模式则应该由应用的语义来做决定。

There is no difference between partitioned topics and normal topics in terms of how subscription types work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.

Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.

路由模式

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—-that is, which internal topic—-each message should be published to.

有三种 MessageRoutingMode 可用:

发送模式说明
RoundRobinPartition如果消息没有指定 key,为了达到最大吞吐量,生产者会以 round-robin 方式将消息发布到所有分区。 请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果消息指定了key,分区生产者会根据key的hash值将该消息分配到对应的分区。 这是默认的模式。
SinglePartition如果消息没有指定 key,生产者将会随机选择一个分区,并发布所有消息到这个分区。 如果消息指定了key,分区生产者会根据key的hash值将该消息分配到对应的分区。
CustomPartition使用自定义消息路由器实现来决定特定消息的分区。 用户可以创建自定义路由模式:使用 Java client 并实现MessageRouter 接口。

顺序保证

The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.

当使用 SinglePartition或者RoundRobinPartition模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilderHashingScheme 指定的散列shema。

顺序保证说明路由策略与消息Key
按键分区所有具有相同 key 的消息将按顺序排列并放置在相同的分区(Partition)中。使用 SinglePartitionRoundRobinPartition 模式,每条消息都需要有key。
生产者排序来自同一生产者的所有消息都是有序的路由策略为SinglePartition, 且每条消息都没有key。

散列scheme

HashingScheme 是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。

有两种可用的散列函数: JavaStringHashMurmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash

非持久topic

By default, Pulsar persistently stores all unacknowledged messages on multiple BookKeeper bookies (storage nodes). 因此,持久性主题上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。

Pulsar also, however, supports non-persistent topics, which are topics on which messages are never persisted to disk and live only in memory. Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,杀掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。

非持久性主题具有这种形式的名称(注意名称中的 non-persistent):

  1. non-persistent://tenant/namespace/topic

如何使用非持久topic的更多信息,请参考 Non-persistent messaging cookbook

In non-persistent topics, brokers immediately deliver messages to all connected subscribers without persisting them in BookKeeper. 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。

非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 Use non-persistent topics only if you’re certain that your use case requires it and can sustain it.

默认非持久topic在broker上是开启的。 你可以通过broker的配置关闭。 You can manage non-persistent topics using the pulsar-admin topics command. For more information, see pulsar-admin.

性能

Non-persistent messaging is usually faster than persistent messaging because brokers don’t persist messages and immediately send acks back to the producer as soon as that message is delivered to connected brokers. 非持久topic让producer有更低的发布延迟。

客户端API

Producer和consumer以连接持久topic同样的方式连接到非持久topic。重要的区别是,topic的名称必须以non-persistent开头。 All three subscription types—-exclusive, shared, and failover-—are supported for non-persistent topics.

下面是一个非持久topic的java consumer例子:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .build();
  4. String npTopic = "non-persistent://public/default/my-topic";
  5. String subscriptionName = "my-subscription-name";
  6. Consumer<byte[]> consumer = client.newConsumer()
  7. .topic(npTopic)
  8. .subscriptionName(subscriptionName)
  9. .subscribe();

这里还有一个非持久topic的java producer例子:

  1. Producer<byte[]> producer = client.newProducer()
  2. .topic(npTopic)
  3. .create();

消息保留和过期

Pulsar broker默认如下:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • 以消息backlog的形式,持久保存所有的未被确认消息

Pulsar有两个特性,让你可以覆盖上面的默认行为。

  • Message retention enables you to store messages that have been acknowledged by a consumer
  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry cookbook.

下图说明了这两种概念:

消息保留和过期

With message retention, shown at the top, a retention policy applied to all topics in a namespace dictates that some messages are durably stored in Pulsar even though they’ve already been acknowledged. 没有被留存规则覆盖的消息将会被删除。 Without a retention policy, all of the acknowledged messages would be deleted.

图中下面的是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)

消息去重

消息去重保证了一条消息只能在 Pulsar 服务端被持久化一次。 消息去重是一个 Pulsar 可选的特性,它能够阻止不必要的消息重复,它保证了即使消息被消费了多次,也只会被保存一次。

下图展示了开启和关闭消息去重的场景:

Pulsar消息去重

最上面的场景中,消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被持久化到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。

在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1.

Message deduplication is handled at the namespace level or the topic level. For more instructions, see the message deduplication cookbook.

生产者幂等

The other available approach to message deduplication is to ensure that each message is only produced once. This approach is typically called producer idempotency. 这种方式的缺点是,把消息去重的工作推给了应用去做。 在 Pulsar 中,消息去重是在 broker上处理的,用户不需要去修改客户端的代码。 相反,你只需要通过修改配置就可以实现。 可以通过查看消息去重指南去了解更多详细信息。

去重和实际一次语义

消息去重,使 Pulsar 成为了流处理引擎(SPE)或者其他寻求 “仅仅一次” 语义的连接系统所需的理想消息系统。 如果消息系统没有提供自动去重能力,那么 SPE (流处理引擎) 或者其他连接系统就必须自己实现去重语义,这意味着需要应用去承担这部分的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。

你能够在这篇博客上获得更详细的信息。

消息延迟传递

延时消息功能允许你能够过一段时间才能消费到这条消息,而不是消息发布后,就马上可以消费到。 In this mechanism, a message is stored in BookKeeper, DelayedDeliveryTracker maintains the time index(time -> messageId) in memory after published to a broker, and it is delivered to a consumer once the specific delayed time is passed.

Delayed message delivery only works in Shared subscription type. In Exclusive and Failover subscription types, the delayed message is dispatched immediately.

如下图所示,说明了延时消息的实现机制:

延时消息

Broker 保存消息是不经过任何检查的。 当消费者消费一条消息时,如果这条消息是延时消息,那么这条消息会被加入到DelayedDeliveryTracker当中。 订阅检查机制会从DelayedDeliveryTracker获取到超时的消息,并交付给消费者。

Broker

Delayed message delivery is enabled by default. You can change it in the broker configuration file as below:

  1. # Whether to enable the delayed delivery for messages.
  2. # If disabled, messages are immediately delivered and there is no tracking overhead.
  3. delayedDeliveryEnabled=true
  4. # Control the ticking time for the retry of delayed message delivery,
  5. # affecting the accuracy of the delivery time compared to the scheduled time.
  6. # Default is 1 second.
  7. delayedDeliveryTickTimeMillis=1000

生产者(Producer)

下面是 Java 当中生产延时消息一个例子:

  1. // message to be delivered at the configured delay interval
  2. producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();