Pulsar 依靠两个外部系统完成重要工作:

ZooKeeper 和 BookKeeper 都是 Apache 开源项目。

Skip to the How Pulsar uses ZooKeeper and BookKeeper section below for a more schematic explanation of the role of these two systems in Pulsar.

ZooKeeper

每个 Pulsar 实例需要两个独立的 ZooKeeper 集群。

  • Local ZooKeeper operates at the cluster level and provides cluster-specific configuration management and coordination. 每个 Pulsar 集群都需要一个专用的 ZooKeeper 集群。
  • Configuration Store 在实例层面运行,为整个系统提供配置管理(从而实现跨集群)。 An independent cluster of machines or the same machines that local ZooKeeper uses can provide the configuration store quorum.

Deploy local ZooKeeper

ZooKeeper manages a variety of essential coordination-related and configuration-related tasks for Pulsar.

To deploy a Pulsar instance, you need to stand up one local ZooKeeper cluster per Pulsar cluster.

首先,将所有 ZooKeeper 服务器添加到 conf/zookeeper.conf 指定的 quorum 配置中。 Add a server.N line for each node in the cluster to the configuration, where N is the number of the ZooKeeper node. The following is an example for a three-node cluster:

  1. server.1=zk1.us-west.example.com:2888:3888
  2. server.2=zk2.us-west.example.com:2888:3888
  3. server.3=zk3.us-west.example.com:2888:3888

在每台主机上,您需要为每个节点指定节点 ID 到 myid 文件中, 默认位置在服务器的 data/zookeeper 目录 (您可以通过 dataDir 参数更改文件位置)。

See the Multi-server setup guide in the ZooKeeper documentation for detailed information on myid and more.

On a ZooKeeper server at zk1.us-west.example.com, for example, you can set the myid value like this:

  1. $ mkdir -p data/zookeeper
  2. $ echo 1 > data/zookeeper/myid

On zk2.us-west.example.com the command is echo 2 > data/zookeeper/myid and so on.

Once you add each server to the zookeeper.conf configuration and each server has the appropriate myid entry, you can start ZooKeeper on all hosts (in the background, using nohup) with the pulsar-daemon CLI tool:

  1. $ bin/pulsar-daemon start zookeeper

部署 Configuration Store

The ZooKeeper cluster configured and started up in the section above is a local ZooKeeper cluster that you can use to manage a single Pulsar cluster. 但是,除了本地集群之外,一个完整的 Pulsar 实例还需要 configuration store来处理一些实例级配置和协调任务。

If you deploy a single-cluster instance, you do not need a separate cluster for the configuration store. 但是,如果部署了 多集群 实例,则需要为配置任务设置一个单独的 ZooKeeper 集群。

单集群 Pulsar 实例

If your Pulsar instance consists of just one cluster, then you can deploy a configuration store on the same machines as the local ZooKeeper quorum but run on different TCP ports.

To deploy a ZooKeeper configuration store in a single-cluster instance, add the same ZooKeeper servers that the local quorom uses to the configuration file in conf/global_zookeeper.conf using the same method for local ZooKeeper, but make sure to use a different port (2181 is the default for ZooKeeper). The following is an example that uses port 2184 for a three-node ZooKeeper cluster:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186

As before, create the myid files for each server on data/global-zookeeper/myid.

Multi-cluster Pulsar instance

When you deploy a global Pulsar instance, with clusters distributed across different geographical regions, the configuration store serves as a highly available and strongly consistent metadata store that can tolerate failures and partitions spanning whole regions.

The key here is to make sure the ZK quorum members are spread across at least 3 regions and that other regions run as observers.

Again, given the very low expected load on the configuration store servers, you can share the same hosts used for the local ZooKeeper quorum.

For example, you can assume a Pulsar instance with the following clusters us-west, us-east, us-central, eu-central, ap-south. Also you can assume, each cluster has its own local ZK servers named such as

  1. zk[1-3].${CLUSTER}.example.com

In this scenario you want to pick the quorum participants from few clusters and let all the others be ZK observers. For example, to form a 7 servers quorum, you can pick 3 servers from us-west, 2 from us-central and 2 from us-east.

This guarantees that writes to configuration store is possible even if one of these regions is unreachable.

The ZK configuration in all the servers looks like:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186
  5. server.4=zk1.us-central.example.com:2185:2186
  6. server.5=zk2.us-central.example.com:2185:2186
  7. server.6=zk3.us-central.example.com:2185:2186:observer
  8. server.7=zk1.us-east.example.com:2185:2186
  9. server.8=zk2.us-east.example.com:2185:2186
  10. server.9=zk3.us-east.example.com:2185:2186:observer
  11. server.10=zk1.eu-central.example.com:2185:2186:observer
  12. server.11=zk2.eu-central.example.com:2185:2186:observer
  13. server.12=zk3.eu-central.example.com:2185:2186:observer
  14. server.13=zk1.ap-south.example.com:2185:2186:observer
  15. server.14=zk2.ap-south.example.com:2185:2186:observer
  16. server.15=zk3.ap-south.example.com:2185:2186:observer

Additionally, ZK observers need to have:

  1. peerType=observer
Start the service

Once your configuration store configuration is in place, you can start up the service using pulsar-daemon

  1. $ bin/pulsar-daemon start configuration-store

ZooKeeper configuration

In Pulsar, ZooKeeper configuration is handled by two separate configuration files in the conf directory of your Pulsar installation: conf/zookeeper.conf for local ZooKeeper and conf/global-zookeeper.conf for configuration store.

Local ZooKeeper

The conf/zookeeper.conf file handles the configuration for local ZooKeeper. The table below shows the available parameters:

NameDescription默认值
tickTimeThe tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick.2000
initLimitThe maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter.10
syncLimitThe maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter.5
dataDirThe location where ZooKeeper stores in-memory database snapshots as well as the transaction log of updates to the database.data/zookeeper
clientPortThe port on which the ZooKeeper server listens for connections.2181
autopurge.snapRetainCountIn ZooKeeper, auto purge determines how many recent snapshots of the database stored in dataDir to retain within the time interval specified by autopurge.purgeInterval (while deleting the rest).3
autopurge.purgeIntervalThe time interval, in hours, which triggers the ZooKeeper database purge task. Setting to a non-zero number enables auto purge; setting to 0 disables. Read this guide before enabling auto purge.1
maxClientCnxnsThe maximum number of client connections. Increase this if you need to handle more ZooKeeper clients.60

Configuration Store

The conf/global-zookeeper.conf file handles the configuration for configuration store. The table below shows the available parameters:

BookKeeper

BookKeeper is responsible for all durable message storage in Pulsar. BookKeeper is a distributed write-ahead log WAL system that guarantees read consistency of independent message logs calls ledgers. Individual BookKeeper servers are also called bookies.

For a guide to managing message persistence, retention, and expiry in Pulsar, see this cookbook.

硬件条件

Bookie hosts are responsible for storing message data on disk. In order for bookies to provide optimal performance, ensuring that the bookies have a suitable hardware configuration is essential. You can choose two key dimensions to bookie hardware capacity:

  • Disk I/O capacity read/write
  • Storage capacity

Message entries written to bookies are always synced to disk before returning an acknowledgement to the Pulsar broker. To ensure low write latency, BookKeeper is designed to use multiple devices:

  • A journal to ensure durability. For sequential writes, having fast fsync operations on bookie hosts is critical. Typically, small and fast solid-state drives (SSDs) should suffice, or hard disk drives (HDDs) with a RAIDs controller and a battery-backed write cache. Both solutions can reach fsync latency of ~0.4 ms.
  • A ledger storage device is where data is stored until all consumers have acknowledged the message. Writes happen in the background, so write I/O is not a big concern. Reads happen sequentially most of the time and the backlog is drained only in case of consumer drain. To store large amounts of data, a typical configuration involves multiple HDDs with a RAID controller.

Configure BookKeeper

You can configure BookKeeper bookies using the conf/bookkeeper.conf configuration file. The most important aspect of configuring each bookie is ensuring that the zkServers parameter is set to the connection string for local ZooKeeper of the Pulsar cluster.

Minimum configuration changes required in conf/bookkeeper.conf are:

  1. # Change to point to journal disk mount point
  2. journalDirectory=data/bookkeeper/journal
  3. # Point to ledger storage disk mount point
  4. ledgerDirectories=data/bookkeeper/ledgers
  5. # Point to local ZK quorum
  6. zkServers=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
  7. # Change the ledger manager type
  8. ledgerManagerType=hierarchical

To change the zookeeper root path that Bookkeeper uses, use zkLedgersRootPath=/MY-PREFIX/ledgers instead of zkServers=localhost:2181/MY-PREFIX

Consult the official BookKeeper docs for more information about BookKeeper.

Deploy BookKeeper

BookKeeper provides persistent message storage for Pulsar.

Each Pulsar broker needs to have its own cluster of bookies. The BookKeeper cluster shares a local ZooKeeper quorum with the Pulsar cluster.

Starting bookies manually

You can start up a bookie in two ways: in the foreground or as a background daemon.

To start up a bookie in the foreground, use the bookeeper CLI tool:

  1. $ bin/bookkeeper bookie

To start a bookie in the background, use the pulsar-daemon CLI tool:

  1. $ bin/pulsar-daemon start bookie

You can verify that the bookie works properly using the bookiesanity command for the BookKeeper shell:

  1. $ bin/bookkeeper shell bookiesanity

This command creates a new ledger on the local bookie, writes a few entries, reads them back and finally deletes the ledger.

Decommissioning bookies cleanly

In case the user wants to decommission a bookie, the following process is useful to follow in order to verify if the decommissioning was safely done.

Before we decommission

  1. Ensure state of your cluster can support the decommissioning of the target bookie. Check if EnsembleSize >= Write Quorum >= Ack Quorum stays true with one less bookie

  2. Ensure target bookie shows up in the listbookies command.

  3. Ensure that there is no other process ongoing (upgrade etc).

Process of Decommissioning

  1. Log on to the bookie node, check if there are underreplicated ledgers.

If there are, the decommission command will force them to be replicated. $ bin/bookkeeper shell listunderreplicated

  1. Stop the bookie by killing the bookie process. Make sure there are no liveness / readiness probes setup for the bookies to spin them back up if you are deployed in a kubernetes environment.

  2. Run the decommission command. If you have logged onto the node you wish to decommission, you don’t need to provide -bookieid If you are running the decommission command for target bookie node from another bookie node you should mention the target bookie id in the arguments for -bookieid $ bin/bookkeeper shell decommissionbookie or $ bin/bookkeeper shell decommissionbookie -bookieid <target bookieid>

  3. Validate that there are no ledgers on decommissioned bookie $ bin/bookkeeper shell listledgers -bookieid <target bookieid>

Last step to verify is you could run this command to check if the bookie you decommissioned doesn’t show up in list bookies:

  1. ./bookkeeper shell listbookies -rw -h
  2. ./bookkeeper shell listbookies -ro -h

BookKeeper persistence policies

In Pulsar, you can set persistence policies, at the namespace level, that determine how BookKeeper handles persistent storage of messages. Policies determine four things:

  • The number of acks (guaranteed copies) to wait for each ledger entry.
  • The number of bookies to use for a topic.
  • The number of writes to make for each ledger entry.
  • The throttling rate for mark-delete operations.

Set persistence policies

You can set persistence policies for BookKeeper at the namespace level.

Pulsar-admin

Use the set-persistence subcommand and specify a namespace as well as any policies that you want to apply. The available flags are:

标记Description默认值
-a, —bookkeeper-ack-quoromThe number of acks (guaranteed copies) to wait on for each entry0
-e, —bookkeeper-ensembleThe number of bookies to use for topics in the namespace0
-w, —bookkeeper-write-quorumThe number of writes to make for each entry0
-r, —ml-mark-delete-max-rateThrottling rate for mark-delete operations (0 means no throttle)0

The following is an example:

  1. $ pulsar-admin namespaces set-persistence my-tenant/my-ns \
  2. --bookkeeper-ack-quorom 3 \
  3. --bookeeper-ensemble 2

REST API

POST /admin/v2/namespaces/:tenant/:namespace/persistence

Java

  1. int bkEnsemble = 2;
  2. int bkQuorum = 3;
  3. int bkAckQuorum = 2;
  4. double markDeleteRate = 0.7;
  5. PersistencePolicies policies =
  6. new PersistencePolicies(ensemble, quorum, ackQuorum, markDeleteRate);
  7. admin.namespaces().setPersistence(namespace, policies);

List persistence policies

You can see which persistence policy currently applies to a namespace.

Pulsar-admin

Use the get-persistence subcommand and specify the namespace.

The following is an example:

  1. $ pulsar-admin namespaces get-persistence my-tenant/my-ns
  2. {
  3. "bookkeeperEnsemble": 1,
  4. "bookkeeperWriteQuorum": 1,
  5. "bookkeeperAckQuorum", 1,
  6. "managedLedgerMaxMarkDeleteRate": 0
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/persistence

Java

  1. PersistencePolicies policies = admin.namespaces().getPersistence(namespace);

Pulsar 如何使用 ZooKeeper 和 BookKeeper

This diagram illustrates the role of ZooKeeper and BookKeeper in a Pulsar cluster:

ZooKeeper 和 BookKeeper

Each Pulsar cluster consists of one or more message brokers. Each broker relies on an ensemble of bookies.