At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can replicate data amongst themselves.

单个 Pulsar 集群由以下三部分组成:

  • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
  • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储
  • A ZooKeeper cluster specific to that cluster handles coordination tasks between Pulsar clusters.

下图为一个 Pulsar 集群:

Pulsar架构图

在更细粒度的实例级别, 有一个能访问到全部实例的ZooKeeper群集处理涉及多个pulsar集群的配置协调任务, 例如 异地复制

Brokers

Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:

  • 一个 HTTP 服务器, 它暴露了 REST 系统管理接口以及在生产者和消费者之间进行 Topic查找的API。
  • 一个调度分发器, 它是异步的TCP服务器,通过自定义 二进制协议应用于所有相关的数据传输。

Messages are typically dispatched out of a managed ledger cache for the sake of performance, unless the backlog exceeds the cache size. 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域

如何管理Pulsar Brokers, 请参考 brokers 指南

集群

A Pulsar instance consists of one or more Pulsar clusters. Clusters, in turn, consist of:

  • 一个或者多个Pulsar brokers
  • 一个ZooKeeper协调器,用于集群级别的配置和协调
  • 一组BookKeeper的Bookies用于消息的 持久化存储

集群间可以通过异地复制进行消息同步

如何管理Pulsar集群,请参考clusters指南

元数据存储

Pulsar uses Apache Zookeeper for metadata storage, cluster configuration, and coordination. In a Pulsar instance:

  • 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项
  • 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的),等等

持久化存储

Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.

为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达 This mode of messaging is commonly called persistent messaging. 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储

Apache BookKeeper

Pulsar用 Apache BookKeeper作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

  • It enables Pulsar to utilize many independent logs, called ledgers. Multiple ledgers can be created for topics over time.
  • 为按条目复制的顺序数据提供了非常高效的存储。
  • 保证了多系统挂掉时ledgers的读取一致性。
  • 提供不同的Bookies之间均匀的IO分布的特性。
  • It’s horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
  • Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

In addition to message data, cursors are also persistently stored in BookKeeper. Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。

At the moment, Pulsar supports persistent message storage. This accounts for the persistent in all topic names. Here’s an example:

  1. persistent://tenant/namespace/topic

Pulsar也支持临时消息( (non-persistent) )存储。

下图展示了brokers和bookies是如何交互的

Brokers和bookies

Ledgers

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:

  • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。
  • 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。
  • 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)。

Ledger读一致性

BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。

Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

  1. 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。
  2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

日志存储

In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的日志文件会被创建。

Pulsar proxy

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。

The Pulsar proxy provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. 如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信。

为了性能和容错,你可以运行任意个Pulsar proxy。

架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。 Here’s an example:

  1. $ bin/pulsar proxy \
  2. --zookeeper-servers zk-0,zk-1,zk-2 \
  3. --configuration-store-servers zk-0,zk-1,zk-2

Pulsar proxy 文档

如何使用Pulsar proxy,参考 Pulsar proxy 管理指南

关于Pulsar proxy有一些比较重要的注意点:

  • Connecting clients don’t need to provide any specific configuration to use the Pulsar proxy. 除了更新用于服务URL的IP之外,你不需要为现有的应用更新客户端配置(例如你在Pulsar proxy上层架设运行了负载均衡器)。
  • Pulsar proxy支持TLS 加密认证

Service discovery

Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar内部提供了服务发现的机制,你可以通过 配置Pulsar实例指南设置。

你也可以用你自己的服务发现系统。 If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as http://pulsar.us-west.example.com:8080, the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

下面这张图展示了Pulsar服务发现机制:

alt-text

图中,Pulsar集群可以通过一个DNS名称寻址:pulsar-cluster.acme.com。 例如Python客户端,可以像这样访问这个Pulsar集群:

  1. from pulsar import Client
  2. client = Client('pulsar://pulsar-cluster.acme.com:6650')