Use filesystem offloader with Pulsar
本章将一步一步的指导你完成在 Pulsar 中安装和配置filesystem offloader。
安装
本节介绍如何安装 filesystem offloader。
前提条件
- Pulsar: 2.4.2 或更高版本
步骤
下列示例将使用 Pulsar 2.5.1。
使用下列方式下载 Pulsar 压缩包:
下载并解压 Pulsar offloaders 程序包。
wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz
tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz
Note
如果在一个裸机集群中运行 Pulsar,请确保
offloaders
安装包已经在 broker 的所有 Pulsar 目录下解压缩。如果在 Docker 中运行 Pulsar 或使用 Docker 镜像(如 K8S 和 DCOS)部署 Pulsar,则可以使用
apachepulsar/pulsar-all
镜像。apachepulsar/pulsar-all
镜像已经绑定了分层存储 offloader。
将 Pulsar offloaders 作为
offloaders
复制到Pulsar目录中。mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders
ls offloaders
输出
tiered-storage-file-system-2.5.1.nar
tiered-storage-jcloud-2.5.1.nar
Note
如果在一个裸机集群中运行 Pulsar,请确保
offloaders
安装包已经在 broker 的所有 Pulsar 目录下解压缩。如果在 Docker 中运行 Pulsar 或使用 Docker 镜像(如 K8S 和 DCOS)部署 Pulsar,则可以使用
apachepulsar/pulsar-all
镜像。apachepulsar/pulsar-all
镜像已经绑定了分层存储 offloader。
Configuration
Note
在将数据从 BookKeeper 卸载到 filesystem 之前,你需要配置 filesystem offloader 驱动程序的一些属性。
此外,你还可以配置 filesystem offloader 的自动触发和手动触发机制。
配置 filesystem offloader driver
您可以在 broker.conf
或 standalone.conf
配置文件中配置 filesystem offloader 驱动程序。
HDFS
NFS
以下为必选 配置。
参数| 描述| 示例值 |—-|—-|—-
managedLedgerOffloadDriver
| Offloader 驱动程序名称,不区分大小写。 | filesystemfileSystemURI
| 连接地址,访问默认 Hadoop 分布式文件系统的 URI。 | hdfs://127.0.0.1:9000offloadersDirectory
| Offloader directory | offloadersfileSystemProfilePath
| Hadoop profile path. The configuration file is stored in the Hadoop profile path. 它包含了一些 Hadoop 性能调整的配置。 | ../conf/filesystem_offload_core_site.xml以下为可选 配置。
参数| 描述| 示例值 |—-|—-|—-
managedLedgerMinLedgerRolloverTimeMinutes
| topic 的 ledger 切换的最短时间。注意: 不建议在生产环境中设置此参数。|2
managedLedgerMaxEntriesPerLedger
|在触发滚动之前添加到 ledger 的最大条目数。注意: 不建议在生产环境中设置此参数。|5000
以下为必选 配置。
参数| 描述| 示例值 |—-|—-|—-
managedLedgerOffloadDriver
| Offloader 驱动程序名称,不区分大小写。 | filesystemoffloadersDirectory
| Offloader directory | offloadersfileSystemProfilePath
| NFS profile path. The configuration file is stored in the NFS profile path. 它包含用于性能调整的各种设置。 | ../conf/filesystem_offload_core_site.xml以下为可选 配置。
参数| 描述| 示例值 |—-|—-|—-
managedLedgerMinLedgerRolloverTimeMinutes
| topic 的 ledger 切换的最短时间。注意: 不建议在生产环境中设置此参数。|2
managedLedgerMaxEntriesPerLedger
|在触发滚动之前添加到 ledger 的最大条目数。注意: 不建议在生产环境中设置此参数。|5000
自动运行 filesystem offloader
您可以将命名空间策略配置为在达到阈值后自动卸载数据。 该阈值基于一个 topic 在 Pulsar 集群的数据存储大小而定。 一旦 topic 到达阈值,就自动触发卸载操作。
阈值|动作 |—-|—- | > 0 | 如果 topic 存储达到其阈值,它就会触发卸载操作。 = 0| 它使 broker 竭尽全力地 offload 数据。 < 0 | 它将禁用自动卸载操作。
Automatic offload runs when a new segment is added to a topic log. 如果在命名空间上设置了阈值,但向该 topic 生产的消息很少,filesystem offloader 将会停止工作,直至当前 segment 达到饱和。
通过命令行工具(CLI)可以设置阈值大小,比如 pulsar-admin。
示例
该示例使用 pulsar-admin 将 filesystem offloader 的阈值大小设置为 10 MB。
pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace
提示
关于
pulsar-admin namespaces set-offload-threshold options
命令的更多信息,包括标志、描述、默认值和速记方法,见这里。
手动运行 filesystem offloader
对于每一个 topic,使用下列方法之一可以手动触发 filesystem offloader:
使用 REST 端点。
使用命令行工具(例如 pulsar-admin)。
要想用命令行工具触发 filesystem offloader,需要指定一个 topic 应该保留在 Pulsar 集群中的最大数据量(阈值)。 如果 Pulsar 集群上 topic 的数据大小超过了这个阈值,那么该 topic 下的 segment 就会被陆续卸载到 filesystem 上,直到不再超过这个阈值。 优先卸载旧的 segment。
示例
此示例使用 pulsar-admin 手动运行 filesystem offloader。
pulsar-admin topics offload --size-threshold 10M persistent://my-tenant/my-namespace/topic1
输出
Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
提示
关于
pulsar-admin topics offload-status options
命令的更多信息,包括标志、描述、默认值和速记方法,见这里。该示例使用 pulsar-admin 检查 filesystem offloader 的状态。
pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
输出
Offload is currently running
要等待 filesystem 完成任务,请添加
-w
参数。pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1
输出
Offload was a success
如果在卸载操作中出现错误,该错误会将会在
pulsar-admin topics offload-status
命令中抛出。pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
输出
Error in offload
null
Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=
提示
关于
pulsar-admin topics offload-status options
命令的更多信息,包括标志、描述、默认值和速记方法,见这里。
教程
本节提供有关如何使用 filesystem offloader 将数据从 Pulsar 移动到 Hadoop 分布式文件系统(HDFS)或网络文件系统(NFS)的分步说明。
HDFS
NFS
要将数据从 Pulsar 移动到 HDFS,请按照以下步骤进行。
第 1 步:准备 HDFS 环境
本教程使用 Hadoop 3.2.1设置一个 Hadoop 单个节点集群。
提示
有关如何设置 Hadoop 单节点集群的详细信息,请参见此处。
下载并解压缩 Hadoop 3.2.1。
wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz tar -zxvf hadoop-3.2.1.tar.gz -C $HADOOP_HOME
配置 Hadoop。
# $HADOOP_HOME/etc/hadoop/core-site.xml<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property></configuration># $HADOOP_HOME/etc/hadoop/hdfs-site.xml<configuration> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>
设置无密码 ssh。
# 现在检查无需密码是否可以使用 ssh 连接到 localhost:$ ssh localhost# 如果在无密码的情况下无法 ssh 连接到 localhost,请执行以下命令$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys$ chmod 0600 ~/.ssh/authorized_keys
启动 HDFS。
# 不要重复执行该命令,重复执行会导致数据节点的 clusterId 与 namenode 不一致$HADOOP_HOME/bin/hadoop namenode -format$HADOOP_HOME/sbin/start-dfs.sh
导航到 HDFS 网站。
您可以查看 概述 页面。
在顶部导航栏,单击 数据节点 查看数据节点信息。
单击 HTTP 地址 获取更多关于 localhost:9866 的详细信息。
如下文所示, 所用容量 的大小是 4 KB,它是初始值。
第 2 步: 安装 filesystem offloader
更多详细信息,请参阅 安装。
第 3 步: 配置 filesystem offloader
正如 配置 部分所指明的那样,需要先配置 filesystem offloader 驱动程序的一些属性,然后才能使用。 本教程假设您已按以下方式配置 filesystem offloader 驱动程序,并在单机(Standalone)模式下运行 Pulsar。
在 conf/standalone.conf
文件中设置以下配置。
managedLedgerOffloadDriver=filesystemfileSystemURI=hdfs://127.0.0.1:9000fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
Note
为测试目的,您可以设置以下两个配置来加速 ledger 切换, 但不建议在生产环境中设置它们。
managedLedgerMinLedgerRolloverTimeMinutes=1managedLedgerMaxEntriesPerLedger=100
Note
在本节中,假定您已启用 NFS 服务并设置 NFS 服务的共享路径。 在本节中,
/Users/test
被用作 NFS 服务的共享路径。
要将数据卸载到 NFS,请执行以下步骤。
第 1 步: 安装 filesystem offloader
更多详细信息,请参阅 安装。
第 2 步:将 NFS 置于本地文件系统
此示例将挂载 /Users/pulsar_nfs 到 /Users/test。
mount -e 192.168.0.103:/Users/test/Users/pulsar_nfs
第 3 步: 配置 filesystem offloader 驱动程序
正如 配置 部分所指明的那样,需要先配置 filesystem offloader 驱动程序的一些属性,然后才能使用。 本教程假设您已按以下方式配置 filesystem offloader 驱动程序,并在单机(Standalone)模式下运行 Pulsar。
在
conf/standalone.conf
文件中设置以下配置。managedLedgerOffloadDriver=filesystemfileSystemProfilePath=../conf/filesystem_offload_core_site.xml
修改 filesystem_offload_core_site.xml,如下所示。
<property> <name>fs.defaultFS</name> <value>file:///</value></property><property> <name>hadoop.tmp.dir</name> <value>file:///Users/pulsar_nfs</value></property><property> <name>io.file.buffer.size</name> <value>4096</value></property><property> <name>io.seqfile.compress.blocksize</name> <value>1000000</value></property><property> <name>io.seqfile.compression.type</name> <value>BLOCK</value></property><property> <name>io.map.index.interval</name> <value>128</value></property>
第 4 步:将数据从 BookKeeper 卸载到文件系统
在下载 Pulsar tar 包的仓库中执行以下命令。 例如, ~/path/to/apache-pulsar-2.5.1
。
启动 Pulsar 单机模式。
bin/pulsar standalone -a 127.0.0.1
为了确保生成的数据不会立即被删除,建议设置 保留策略, 可以是 大小 限制或 时间 限制。 为保留策略设定的值越大,数据保留的时间越长。
bin/pulsar-admin namespaces set-retention public/default --size 100M --time 2d
提示
关于
pulsarctl namespaces set-retention options
命令的更多信息,包括参数、描述、默认值和速记方法,参见这里。使用 pulsar-client 生成数据。
bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
卸载操作在触发 ledger 切换后开始。 为了确保卸载数据成功,建议您等待下,直到触发多个 ledger 切换。 在这种情况下,可能需要等待一会。 您可以使用 pulsarctl 检查 ledger 状态。
bin/pulsar-admin topics stats-internal public/default/fs-test
输出
The data of the ledger 696 is not offloaded.
{
"version": 1,
"creationDate": "2020-06-16T21:46:25.807+08:00",
"modificationDate": "2020-06-16T21:46:25.821+08:00",
"ledgers": [
{
"ledgerId": 696,
"isOffloaded": false
}
],
"cursors": {}
}
请稍等,然后向 topic 发送更多消息。
bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
使用 pulsarctl 检查 ledger 状态。
bin/pulsar-admin topics stats-internal public/default/fs-test
输出
Ledger 696 的数据未卸载。
{
"version": 2,
"creationDate": "2020-06-16T21:46:25.807+08:00",
"modificationDate": "2020-06-16T21:48:52.288+08:00",
"ledgers": [
{
"ledgerId": 696,
"entries": 1001,
"size": 81695,
"isOffloaded": false
},
{
"ledgerId": 697,
"isOffloaded": false
}
],
"cursors": {}
}
使用 pulsarctl 手动触发卸载操作。
bin/pulsar-admin topics offload -s 0 public/default/fs-test
输出
ledger 中的数据先于 ledge 697 已卸载。
# offload info, the ledgers before 697 will be offloaded
Offload triggered for persistent://public/default/fs-test3 for messages before 697:0:-1
使用 pulsarctl 检查 ledger 状态。
bin/pulsar-admin topics stats-internal public/default/fs-test
输出
ledger 696 的数据已卸载。
{
"version": 4,
"creationDate": "2020-06-16T21:46:25.807+08:00",
"modificationDate": "2020-06-16T21:52:13.25+08:00",
"ledgers": [
{
"ledgerId": 696,
"entries": 1001,
"size": 81695,
"isOffloaded": true
},
{
"ledgerId": 697,
"isOffloaded": false
}
],
"cursors": {}
}
并且 所用容量 已经从 4 KB 变成 116.46 KB。