ZooKeeper and BookKeeper administration

Pulsar 依靠两个外部系统完成重要工作:

ZooKeeper 和 BookKeeper 都是 Apache 开源项目。

Skip to the How Pulsar uses ZooKeeper and BookKeeper section below for a more schematic explanation of the role of these two systems in Pulsar.

ZooKeeper

每个 Pulsar 实例需要两个独立的 ZooKeeper 集群。

  • 本地集群负责集群级别的操作,提供集群的配置管理和协调。 每个 Pulsar 集群都需要一个专用的 ZooKeeper 集群。
  • Configuration Store 在实例层面运行,为整个系统提供配置管理(从而实现跨集群)。 配置集群可以部署在独立的机器集群,也可以和本地 Zookeeper 集群部署在同一批机器。

部署本地Zookeeper

ZooKeeper 负责管理与 Pulsar 协调和配置相关的各种基本任务。

To deploy a Pulsar instance, you need to stand up one local ZooKeeper cluster per Pulsar cluster.

首先,将所有 ZooKeeper 服务器添加到 conf/zookeeper.conf 指定的 quorum 配置中。 在配置文件中为每个节点添加一个 server.N 行,其中 N 是 ZooKeeper 节点的编号。 以下是一个三节点集群的示例:

  1. server.1=zk1.us-west.example.com:2888:3888
  2. server.2=zk2.us-west.example.com:2888:3888
  3. server.3=zk3.us-west.example.com:2888:3888

在每台主机上,您需要为每个节点指定节点 ID 到 myid 文件中, 默认位置在服务器的 data/zookeeper 目录 (您可以通过 dataDir 参数更改文件位置)。

参考多集群安装指南的Zookeeper 文档了解更多关于myid或者其他部分的详细信息。

例如,在 Zookeeper 服务器zk1.us-west.example.com上,你可以通过如下方式设置myid

  1. $ mkdir -p data/zookeeper
  2. $ echo 1 > data/zookeeper/myid

在服务器zk2.us-west.example.com上,设置通过命令echo 2 > data/zookeeper/myid设置myid。

一旦你在每台机器增加了zookeeper.conf配置文件,并且设置了myid,你能够在所有机器上使用pulsar-daemon命令去启动Zookeeper 服务(前台运行或者后台运行)。

  1. $ bin/pulsar-daemon start zookeeper

部署 Configuration Store

The ZooKeeper cluster configured and started up in the section above is a local ZooKeeper cluster that you can use to manage a single Pulsar cluster. 但是,除了本地集群之外,一个完整的 Pulsar 实例还需要 configuration store来处理一些实例级配置和协调任务。

如果是部署一个单集群实例,你不需要另外部署存储配置的 Zookeeper 集群。 但是,如果部署了 多集群 实例,则需要为配置任务设置一个单独的 ZooKeeper 集群。

单集群 Pulsar 实例

如果你的 Pulsar 实例只包含一个集群,你可以在同一批机器上部署本地 Zookeeper 集群和配置存储 Zookeeper 集群,此时这两个集群的 TCP 端口信息需要不一样。

要在单个集群实例中部署配置存储的 Zookeeper 集群。步骤跟部署本地 Zookeeper一样,将同一批机器(跟本地集群一样的机器列表)信息添加到配置文件conf/global_zookeeper.conf中,但是必须确保使用不同的端口(Zookeeper 默认使用2181端口)。 以下是一个使用 2184 端口的三节点ZooKeeper 集群的示例:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186

跟之前一样,在 data/global-zoocheper/myid 文件为每个服务器创建 myid

多集群实例

当你部署一个全球性的Pulsar实例时,集群分布在不同的地理区域。配置存储必须是一个高可用和强一致的元数据存储服务,必须能够容忍整个区域的故障和分区。

这里的关键是确保 ZK 成员投票节点必须分布在3个区域,其他区域以观察员身份运行。

强调一下,配置存储集群的服务器的负载预计会非常低,所以你能够和本地的 zookeeper 服务器的共用同一批机器。

例如,假设 Pulsar 实例由如下集群组成:us-west, us-east, us-central, eu-central, ap-south。 并且假设每个集群自己的本地 ZK 服务器的名字格式如下:

  1. zk[1-3].${CLUSTER}.example.com

在这种情况下,可以从几个集群中选择投票节点,让其他所有节点成为 ZK 观察者节点。 例如,要形成7个服务器组成的投票节点,您可以从 us-west中挑选3个服务器,从 us-centralus-east分别挑选两个。

这个方法保证即使其中一个区域无法连接,数据也可以写入配置存储服务。

所有服务器中的 ZK 配置如下:

  1. clientPort=2184
  2. server.1=zk1.us-west.example.com:2185:2186
  3. server.2=zk2.us-west.example.com:2185:2186
  4. server.3=zk3.us-west.example.com:2185:2186
  5. server.4=zk1.us-central.example.com:2185:2186
  6. server.5=zk2.us-central.example.com:2185:2186
  7. server.6=zk3.us-central.example.com:2185:2186:observer
  8. server.7=zk1.us-east.example.com:2185:2186
  9. server.8=zk2.us-east.example.com:2185:2186
  10. server.9=zk3.us-east.example.com:2185:2186:observer
  11. server.10=zk1.eu-central.example.com:2185:2186:observer
  12. server.11=zk2.eu-central.example.com:2185:2186:observer
  13. server.12=zk3.eu-central.example.com:2185:2186:observer
  14. server.13=zk1.ap-south.example.com:2185:2186:observer
  15. server.14=zk2.ap-south.example.com:2185:2186:observer
  16. server.15=zk3.ap-south.example.com:2185:2186:observer

此外,ZK 的观察者需要添加如下配置:

  1. peerType=observer
启动服务

配置完成后,你可以通过pulsar-daemon来启动集群。

  1. $ bin/pulsar-daemon start configuration-store

Zookeeper 配置项说明

在 Pulsar 的安装目录中有两个独立的 Zookeeper 配置文件:本地集群使用conf/zookeeper.conf配置文件,配置存储集群使用conf/global-zookeeper.conf配置文件。

本地 Zookeeper 集群

The conf/zookeeper.conf file handles the configuration for local ZooKeeper. 下表是可用的配置项列表:

配置项说明默认值
tickTimeThe tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick.2000
initLimitThe maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter.10
syncLimitThe maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter.5
dataDirZookeeper 存储内存数据库快照和数据库更新的日志的目录。data/zookeeper
clientPortZookeeper 服务器监听连接的端口。2181
autopurge.snapRetainCountZooKeeper 中有快照自动清理机制。该参数指定在 autopurge.purgeInterval 配置的时间间隔内,可以保留的快照文件的数量。3
autopurge.purgeIntervalThe time interval, in hours, which triggers the ZooKeeper database purge task. Setting to a non-zero number enables auto purge; setting to 0 disables. Read this guide before enabling auto purge.1
maxClientCnxnsThe maximum number of client connections. Increase this if you need to handle more ZooKeeper clients.60

Configuration Store

配置存储服务集群使用的配置文件是conf/global-zookeeper.conf。 下表是可用的配置项列表:

BookKeeper

Pulsar 使用 Bookeeper 来存储所有的持久化消息。 Bookeeper 是一个分布式的 预写日志 系统。它保证了独立消息日志读取的一致性,称为 ledger。 Individual BookKeeper servers are also called bookies.

Pulsar 中管理消息的持久性,保留策略和过期时间,请参考该指南

硬件要求

Bookie 机器将数据存储在本机磁盘上。 为了确保最佳性能,需要确保 bookie 有合适的硬件配置。 以下是 Bookie 机器相关的两个关键的硬件指标:

  • 硬盘的读/写能力。
  • 存储容量

默认情况下,在向 Pulsar Broker 返回确认消息之前,消息条目总是同步的写入到磁盘的。 To ensure low write latency, BookKeeper is designed to use multiple devices:

  • A journal to ensure durability. 对于连续的写入来说,bookie 机器的fsync操作必须是快速的。 通常情况下,使用容量小并且读写快速的固态硬盘(SSDS)设备就足够了。也可以是带有RAID控制器和电池支持写入缓存的硬盘驱动(HDDs)。 这两种方案 fsync 延时大约为0.4ms。
  • A ledger storage device stores data. 写入操作是在后台运行的,所以写入的 I/O 性能不是一个大问题。 大部分的读取操作是连续的。backlog 只有在没有消费者的情况下,才会被删除。 要存储大量的数据,典型的配置是多块 HDD 硬盘和 一个RAID 控制器构建硬盘阵列。

Configure BookKeeper

你能够通过配置文件conf/bookkeeper.conf去配置 BookKeeper bookies。 配置bookie时,需要确保zkServers参数设置的是 Pulsar 集群的本地 Zookeeper 的地址的。

至少需要修改conf/bookkeeper.conf文件的如下四个配置:

  1. # Change to point to journal disk mount point
  2. journalDirectory=data/bookkeeper/journal
  3. # Point to ledger storage disk mount point
  4. ledgerDirectories=data/bookkeeper/ledgers
  5. # Point to local ZK quorum
  6. zkServers=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181

如果需要更改 Bookeeper 在 Zookeeper 中的目录路径,需要使用zkLedgersRootPath=/MY-PREFIX/ledgers,而不是zkServers=localhost:2181/MY-PREFIX

需要了解更多 Bookeeper 的信息,请参考BookKeeper 官方文档

部署 Bookkeeper

BookKeeper provides persistent message storage for Pulsar. 每个 Pulsar Broker 都有属于自己的 bookie。 The BookKeeper cluster shares a local ZooKeeper quorum with the Pulsar cluster.

手动启动 bookie

你可以在前台或者后台启动 bookie。

可以使用bookkeeper命令行工具在前台启动 bookie。

  1. $ bin/bookkeeper bookie

To start a bookie in the background, use the pulsar-daemon CLI tool:

  1. $ bin/pulsar-daemon start bookie

你可以使用 BookKeeper shell 命令的bookiesanity选项,来校验 bookie 是否正常工作。

  1. $ bin/bookkeeper shell bookiesanity

使用这个命令的时候,底层的机制是在本地的 bookie 创建新的 ledger ,往里面写一些新的条目,然后读取它,最后删除这个ledger。

下线 bookie

下线 bookie 之前,你必须检查你的环境并满足以下要求:

  1. Ensure the state of your cluster supports decommissioning the target bookie. Check if EnsembleSize >= Write Quorum >= Ack Quorum is true with one less bookie.

  2. 确保目标 bookie 在listbookies命令列出的 bookie 列表中,

  3. 确保没有其他的程序正在运行(比如升级)。

And then you can decommission bookies safely. To decommission bookies, complete the following steps.

  1. Log in to the bookie node, check if there are underreplicated ledgers. The decommission command force to replicate the underreplicated ledgers. $ bin/bookkeeper shell listunderreplicated

  2. Stop the bookie by killing the bookie process. Make sure that no liveness/readiness probes setup for the bookies to spin them back up if you deploy it in a Kubernetes environment.

  3. 运行下线命令。

    • 如果你已经登录到了目标节点,则不需要指定参数-bookieid
    • 如果你已经登录到需要下线的节点,你就不需要添加-bookieid参数。如果你希望从其他节点远程执行下线命令,可以加上参数-bookieid$ bin/bookkeeper shell decommissionbookie 或者 $ bin/bookkeeper shell decommissionbookie -bookieid <target bookieid>
  4. Validate that no ledgers are on the decommissioned bookie.
    $ bin/bookkeeper shell listledgers -bookieid <target bookieid>

你可以通过运行一下命令检查下线的 bookie 是否还在 bookie 列表中:

  1. ./bookkeeper shell listbookies -rw -h
  2. ./bookkeeper shell listbookies -ro -h

Bookeeper 持久化策略

In Pulsar, you can set persistence policies at the namespace level, which determines how BookKeeper handles persistent storage of messages. Policies determine four things:

  • 等待每个 ledger 条目的 ack (保证数据保存成功) 数。
  • 主题使用的的 bookie 数量。
  • 每个ledger entry 的写入次数。
  • 标记删除操作的限流率。

设置持久化策略

You can set persistence policies for BookKeeper at the namespace level.

Pulsar-admin

Use the set-persistence subcommand and specify a namespace as well as any policies that you want to apply. The available flags are:

标记说明默认值
-a, —bookkeeper-ack-quorom等待每个 ledger 条目 的 ack (保证数据保存成功) 数。0
-e, —bookkeeper-ensemble主题在命名空间中使用的 bookie 数量。0
-w, —bookkeeper-write-quorum为每个条目写入的次数(即每条消息保存多少份副本)。0
-r, —ml-mark-delete-max-rate标记删除操作的限流频率(0表示不限制)0

如下所示:

  1. $ pulsar-admin namespaces set-persistence my-tenant/my-ns \
  2. --bookkeeper-ack-quorom 3 \
  3. --bookeeper-ensemble 2

REST API

POST /admin/v2/namespaces/:tenant/:namespace/persistence

Java

  1. int bkEnsemble = 2;
  2. int bkQuorum = 3;
  3. int bkAckQuorum = 2;
  4. double markDeleteRate = 0.7;
  5. PersistencePolicies policies =
  6. new PersistencePolicies(ensemble, quorum, ackQuorum, markDeleteRate);
  7. admin.namespaces().setPersistence(namespace, policies);

列出持久化策略

You can see which persistence policy currently applies to a namespace.

Pulsar-admin

Use the get-persistence subcommand and specify the namespace.

如下所示:

  1. $ pulsar-admin namespaces get-persistence my-tenant/my-ns
  2. {
  3. "bookkeeperEnsemble": 1,
  4. "bookkeeperWriteQuorum": 1,
  5. "bookkeeperAckQuorum", 1,
  6. "managedLedgerMaxMarkDeleteRate": 0
  7. }

REST API

GET /admin/v2/namespaces/:tenant/:namespace/persistence

Java

  1. PersistencePolicies policies = admin.namespaces().getPersistence(namespace);

Pulsar 如何使用 ZooKeeper 和 BookKeeper

This diagram illustrates the role of ZooKeeper and BookKeeper in a Pulsar cluster:

ZooKeeper 和 BookKeeper

Each Pulsar cluster consists of one or more message brokers. Each broker relies on an ensemble of bookies.