Pulsar 采用发布-订阅的设计模式,也称作 pub-sub。 该设计模式中,producer 发布消息到 topic, Consumer 可以订阅这些 topic,处理发布过来的消息,在处理完成后发送确认。
一旦创建订阅,即使 consumer 已断开连接,Pulsar 仍然可以保存所有消息。 只有在 consumer 确认消息被成功处理后,保留下来的消息才会被丢弃。
Messages
消息是 Pulsar 的基础“单元”。 消息就是producer发给topic的内容,以及consumer从topic消费的内容(消息处理完成后发送确认)。 消息类似于邮政服务系统中的信件。
Component | 作用 |
---|---|
Value / data payload | The data carried by the message. All Pulsar messages carry raw bytes, although message data can also conform to data schemas |
Key | 消息可以被Key打标签。这对诸如topic压缩之类的事情有作用 |
属性 | 可选的,用户定义属性 的key/value map |
Producer 名称 | 生产消息的producer名称(producer被自动赋予默认名称,但你也可以自己指定) |
序列 ID | Each Pulsar message belongs to an ordered sequence on its topic. A message’s sequence ID is its ordering in that sequence. |
发布时间 | 消息发布的时间戳(producer自动附上) |
事件时间 | An optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set. |
Pulsar消息内容的更深入分解,请参考Pulsar的 binary protocol文档
Producers
生产者是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。
发送模式
Producer可以以同步(sync)或者异步(async)的方式发布消息到broker。
发送模式 | Description |
---|---|
同步发送 | The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn’t received then the producer will consider the send operation a failure. |
异步发送 | Producer 将消息发送到一个阻塞队列(blocking queue)后就立刻返回。 然后,客户端将在后台将消息发送给broker。 如果队列已满( 配置的最大数量),根据传入producer的参数,producer可能阻塞或者直接返回失败。 |
压缩
Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports the following types of compression:
批量处理
If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.
Consumers
A consumer is a process that attaches to a topic via a subscription and then receives messages.
接收模式
Messages can be received from brokers either synchronously (sync) or asynchronously (async).
发送模式 | Description |
---|---|
同步接收 | 同步方式会在获取到消息前呈阻塞状态。 |
异步接收 | 异步方式会立即返回 future 值(如java中的 CompletableFuture ),一旦有新消息,会立刻完成。 |
监听
Client libraries provide listener implementation for consumers. For example, the Java client provides a MesssageListener interface. 在这个接口中,一旦接受到新的消息,received
方法将被调用。
确认
When a consumer has consumed a message successfully, the consumer sends an acknowledgement request to the broker, so that the broker will discard the message. Otherwise, it stores the message.
消息的确认可以一个接一个,也可以累积一起。 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次重发给那个消费者。
Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.
In the shared subscription mode, messages can be acknowledged individually.
取消确认
When a consumer does not consume a message successfully at a time, and wants to consume the message again, the consumer can send a negative acknowledgement to the broker, and then the broker will redeliver the message.
Messages can be negatively acknowledged one by one or cumulatively, which depends on the consumption subscription mode.
In the exclusive and failover subscription modes, consumers only negatively acknowledge the last message they have received.
In the shared and Key_Shared subscription modes, you can negatively acknowledge messages individually.
确认超时
When a message is not consumed successfully, and you want to trigger the broker to redeliver the message automatically, you can adopt the unacknowledged message automatic re-delivery mechanism. Client will track the unacknowledged messages within the entire acktimeout
time range, and send a redeliver unacknowledged messages
request to the broker automatically when the acknowledgement timeout is specified.
Note
Use negative acknowledgement prior to acknowledgement timeout. Negative acknowledgement controls re-delivery of individual messages with more precise, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.
死信主题
Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.
The following example shows how to enable dead letter topic in Java client.
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
Dead letter topic depends on message re-delivery. You need to confirm message re-delivery method: negative acknowledgement or acknowledgement timeout. Use negative acknowledgement prior to acknowledgement timeout.
Note
Currently, dead letter topic is enabled only in the shared subscription mode.
Topic
和其他的发布订阅系统一样,Pulsar 中的 topic 是被命名的通道,用做从producer到 consumer传输消息。 Topic的名称为符合良好结构的URL:
{persistent|non-persistent}://tenant/namespace/topic
Topic名称组成 | Description |
---|---|
persistent / non-persistent | 用来标识 topic 的类型。 Pulsar 支持两种不同 topic:持久化和 非持久 型(如果你没有明确指定,topic 将会是默认的持久化类型)。 持久化 topic 的所有消息都会存储到硬盘上(除非是单机模式的broker,否则都是会在多块磁盘上)。非持久 topic 的数据将不会存储到硬盘上。 |
租户 | The topic’s tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters. |
命名空间 | 将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对命名空间的一项配置。 每个租户可以有多个命名空间。 |
topic | The final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance. |
不需要显式的创建topic
你并不需要显式的创建topic。 如果客户端尝试从一个还不存在的topic写或者接受消息,pulsar将会按在topic名称提供的namespace下自动创建topic。 If no tenant or namespace is specified when a client creates a topic, the topic is created in the default tenant and namespace. You can also create a topic in a specified tenant and namespace, such as
persistent://my-tenant/my-namespace/my-topic
.persistent://my-tenant/my-namespace/my-topic
means themy-topic
topic is created in themy-namespace
namespace of themy-tenant
tenant.
命名空间
命名空间是租户内部逻辑上的命名术语。 一个租户可以通过admin API创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1
,它的namespace是app1
这个应用,对应的租户是 my-tenant
。 你可以在namespace下创建任意数量的topic。
订阅模型
订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar有三种订阅模式:exclusive,shared,failover。 下图展示了这三种模式:
Exclusive
独占模式,只能有一个消费者绑定到订阅上。 如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。
In the diagram above, only Consumer A-0 is allowed to consume messages.
Exclusive模式为默认订阅模式。
Failover(灾备)
Failover模式中,多个consumer可以绑定到同一个subscription。 Consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。 这个consumer被称为master consumer。
当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。
第一幅图中,Consumer-C-1是master consumer,当Consumer-C-1断开连接时,由于Consumer-C-2在队列中下一个位置,那么它将会开始接收消息。
Shared(共享)
shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
第一幅图中,Consumer-B-1和Consumer-B-2都可以订阅主题,其实Consumer-C-1或者其它Consumer也可以订阅。
Shared模式的限制
There are two important things to be aware of when using shared mode: * Message ordering is not guaranteed. * You cannot use cumulative acknowledgment with shared mode.
Key_shared
In Key_Shared mode, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.
Limitations of Key_Shared mode
There are two important things to be aware of when using Key_Shared mode: * You need to specify a key or orderingKey for messages * You cannot use cumulative acknowledgment with Key_Shared mode.
Key_Shared subscription is a beta feature. You can disable it at broker.config.
多主题订阅
当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:persistent://public/default/my-topic
。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:
- 通过最基础的 正则表达式(regex),例如
persistent://public/default/finance-.*
- 通过明确指定的topic列表
通过正则订阅多主题时,所有的主题必须在同一个namespace。
当订阅多主题时,Pulsar客户端会自动调用Pulsar的API来发现匹配表达式或者列表的所有topic,然后全部订阅。 如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。
No ordering guarantees across multiple topics
When a producer sends messages to a single topic, all messages are guaranteed to be read from that topic in the same order. However, these guarantees do not hold across multiple topics. So when a producer sends message to multiple topics, the order in which messages are read from those topics is not guaranteed to be the same.
下面是多主题订阅在java中的例子:
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient pulsarClient = // Instantiate Pulsar client object
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = pulsarClient.subscribe(allTopicsInNamespace, "subscription-1");
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer someTopicsConsumer = pulsarClient.subscribe(someTopicsInNamespace, "subscription-1");
代码例子,请见:
分区 topic
通常一个topic仅被一个broker服务,这限制了topic的最大吞吐量。 分区topic是特殊的topic类型,他可以被多个broker处理,这让topic有更高的吞吐量。
其实在背后,分区的topic通过N个内部topic实现,N是分区的数量。 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。
下图对此做了阐明:
此处,Topic1有5个分区(P0到P4),分布在三个broker上。 因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。
这个topic的消息被广播给两个consumer。 路由模式决定哪个broker处理哪个partition,订阅模式决定哪条消息送到哪个consumer。
大多数境况下,路由和订阅模式可以分开制定。 通常来讲,吞吐能力的要求,决定了 分区/路由 的方式。订阅模式则应该由应用的语义来做决定。
分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。
Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.
路由模式
When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—-that is, which internal topic—-each message should be published to.
有三种 MessageRoutingMode 可用:
发送模式 | Description |
---|---|
RoundRobinPartition | 如果消息没有指定 key,为了达到最大吞吐量,消息会以 round-robin 方式被路由所有分区。 请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。 这是默认的模式。 |
SinglePartition | 如果消息没有指定 key,生产者将会随机选择一个分区,并发送所有消息。 如果为消息指定了key,发往分区的消息会被分区生产者根据 key 做 hash,然后分散到对应的分区上。 |
CustomPartition | 使用自定义消息路由,可以定制消息如何进入特定的分区。 可以使用 Java client 或实现MessageRouter 接口来实现自定义的路由模式。 |
顺序保证
The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.
当使用 SinglePartition
或者RoundRobinPartition
模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilder 中HashingScheme 指定的散列shema。
顺序保证 | Description | 路由策略与消息Key |
---|---|---|
每个 key 分区 | 所有具有相同 key 的消息将按顺序排列并放置在相同的分区(Partition)中。 | 使用 SinglePartition 或 RoundRobinPartition 模式,每条消息都需要有key。 |
同一个生产者 | 来自同一生产者的所有消息都是有序的 | 路由策略为SinglePartition , 且每条消息都没有key。 |
散列scheme
HashingScheme 是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。
有两种可用的散列函数: JavaStringHash
和Murmur3_32Hash
. The default hashing function for producer is JavaStringHash
. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash
是不起作用的。建议使用Murmur3_32Hash
。
非持久topic
默认的,Pulsar保存所有没有确认的消息到多个BookKeeper的bookies中(存储节点)。持久topic的消息数据可以在broker重启或者订阅者出问题的情况下存活下来。 因此,持久性主题上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。
但是,Pulsar还支持非持久性主题,这些主题的消息从不持久存储到磁盘,只存在于内存中。 Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,杀掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。
非持久性主题具有这种形式的名称(注意名称中的 non-persistent
):
non-persistent://tenant/namespace/topic
如何使用非持久topic的更多信息,请参考 Non-persistent messaging cookbook
非持久topic中,broker会立即发布消息给所有连接的订阅者,而不会在BookKeeper中存储。 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。
非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 只有在真的确信你的使用场景符合,并且你可以忍受时,才可去使用非持久topic。
默认非持久topic在broker上是开启的。 你可以通过broker的配置关闭。 你可以通过使用pulsar-admin-topics
接口管理非持久topic。
性能
非持久消息通常比持久消息更快,因为broker无须持久化消息,当消息被分发给所有订阅者时,会立即发送ack给producer。 非持久topic让producer有更低的发布延迟。
客户端API
Producer和consumer以连接持久topic同样的方式连接到非持久topic。重要的区别是,topic的名称必须以non-persistent
开头。 三种订阅模式—exclusive,shared,failover对于非持久topic都是支持的。
下面是一个非持久topic的java consumer例子:
PulsarClient client = PulsarClient.create("pulsar://localhost:6650");
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";
Consumer consumer = client.subscribe(npTopic, subscriptionName);
这里还有一个非持久topic的java producer例子:
Producer producer = client.createProducer(npTopic);
消息保留和到期
Pulsar broker默认如下:
- 立即删除所有已经被cunsumer确认过的的消息
- 以消息backlog的形式,持久保存所有的未被确认消息
Pulsar有两个特性,让你可以覆盖上面的默认行为。
- 消息存留让你可以保存consumer确认过的消息
- 消息过期让你可以给未被确认的消息设置存活时长(TTL)
All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry cookbook.
下图说明了这两种概念:
图中上面的是消息存留,存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 没有留存规则的话,所有被确认的消息都会被删除。
图中下面的是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)
Message deduplication
当消息被Pulsar持久化多于一次的时候,消息就会重复。 消息去重是Pulsar可选的特性,阻止不必要的消息重复,每条消息仅处理一次,即使消息被接收多次
The following diagram illustrates what happens when message deduplication is disabled vs. enabled:
最上面的场景中,消息去重被关闭。 Producer发布消息1到一个topic,消息到达broker后,被持久化到BookKeeper。 然后producer又发送了消息1(可能因为某些重试逻辑),然后消息被接收后又持久化在BookKeeper,这意味着消息重复发生了。
在第二个场景中,producer发送了消息1,消息被broker接收然后持久化,和第一个场景是一样的。 当producer再次发送消息时,broker知道已经收到个消息1,所以不会再持久化消息1.
Message deduplication is handled at the namespace level. For more instructions, see the message deduplication cookbook.
生产者幂等
消息去重的另外一种方法是确保每条消息仅生产一次。 这种方法通常被叫做生产者幂等。 这种方式的缺点是,把消息去重的工作推给了应用去做。 在Pulsar中,去重被broker处理的,这意味着你不需要修改你的客户端代码。 你只需要做一些管理上的变化(参考Managing message deduplication )
去重和实际一次语义
消息去重,使Pulsar成为与流处理引擎(SPE)或者其他寻求实际一次处理语义的系统连接的完美消息系统。 消息系统若不提供自动消息去重,则需要SPE或者其他系统保证去重。这意味着严格的消息顺序来自于让程序承担额外的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。
更深入的信息可以参考 Streamlio blog上的此篇博文