Use filesystem offloader with Pulsar

This chapter guides you through every step of installing and configuring the filesystem offloader and using it with Pulsar.

Installation

This section describes how to install the filesystem offloader.

Prerequisite

  • Pulsar: 2.4.2 or higher versions

Step

This example uses Pulsar 2.5.1.

  1. Download the Pulsar tarball using one of the following ways:

    • Download the Pulsar tarball from the Apache mirror

    • Download the Pulsar tarball from the Pulsar download page

    • Use the wget command to dowload the Pulsar tarball.

      1. wget https://archive.apache.org/dist/pulsar/pulsar-2.5.1/apache-pulsar-2.5.1-bin.tar.gz
  1. Download and untar the Pulsar offloaders package.

    1. wget https://downloads.apache.org/pulsar/pulsar-2.5.1/apache-pulsar-offloaders-2.5.1-bin.tar.gz
    2. tar xvfz apache-pulsar-offloaders-2.5.1-bin.tar.gz
  1. ##### note
  2. - If you run Pulsar in a bare metal cluster, ensure that the `offloaders` tarball is unzipped in every broker's Pulsar directory.
  3. - If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8S and DCOS), you can use the `apachepulsar/pulsar-all` image. The `apachepulsar/pulsar-all` image has already bundled tiered storage offloaders.
  1. Copy the Pulsar offloaders as offloaders in the Pulsar directory.

    1. mv apache-pulsar-offloaders-2.5.1/offloaders apache-pulsar-2.5.1/offloaders
    2. ls offloaders
  1. **Output**
  2. ```
  3. tiered-storage-file-system-2.5.1.nar
  4. tiered-storage-jcloud-2.5.1.nar
  5. ```
  6. ##### note
  7. - If you run Pulsar in a bare metal cluster, ensure that `offloaders` tarball is unzipped in every broker's Pulsar directory.
  8. - If you run Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the `apachepulsar/pulsar-all` image. The `apachepulsar/pulsar-all` image has already bundled tiered storage offloaders.

Configuration

note

Before offloading data from BookKeeper to filesystem, you need to configure some properties of the filesystem offloader driver.

Besides, you can also configure the filesystem offloader to run it automatically or trigger it manually.

Configure filesystem offloader driver

You can configure the filesystem offloader driver in the broker.conf or standalone.conf configuration file.

  • HDFS
  • NFS

  • Required configurations are as below.

    ParameterDescriptionExample value
    managedLedgerOffloadDriverOffloader driver name, which is case-insensitive.filesystem
    fileSystemURIConnection address, which is the URI to access the default Hadoop distributed file system.hdfs://127.0.0.1:9000
    offloadersDirectoryOffloader directoryoffloaders
    fileSystemProfilePathHadoop profile path. The configuration file is stored in the Hadoop profile path. It contains various settings for Hadoop performance tuning.../conf/filesystem_offload_core_site.xml
  • Optional configurations are as below.

    ParameterDescriptionExample value
    managedLedgerMinLedgerRolloverTimeMinutesMinimum time between ledger rollover for a topic.

    Note: it is not recommended to set this parameter in the production environment.
    2
    managedLedgerMaxEntriesPerLedgerMaximum number of entries to append to a ledger before triggering a rollover.

    Note: it is not recommended to set this parameter in the production environment.
    5000
  • Required configurations are as below.

    ParameterDescriptionExample value
    managedLedgerOffloadDriverOffloader driver name, which is case-insensitive.filesystem
    offloadersDirectoryOffloader directoryoffloaders
    fileSystemProfilePathNFS profile path. The configuration file is stored in the NFS profile path. It contains various settings for performance tuning.../conf/filesystem_offload_core_site.xml
  • Optional configurations are as below.

    ParameterDescriptionExample value
    managedLedgerMinLedgerRolloverTimeMinutesMinimum time between ledger rollover for a topic.

    Note: it is not recommended to set this parameter in the production environment.
    2
    managedLedgerMaxEntriesPerLedgerMaximum number of entries to append to a ledger before triggering a rollover.

    Note: it is not recommended to set this parameter in the production environment.
    5000

Run filesystem offloader automatically

You can configure the namespace policy to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic storage reaches the threshold, an offload operation is triggered automatically.

Threshold valueAction
> 0It triggers the offloading operation if the topic storage reaches its threshold.
= 0It causes a broker to offload data as soon as possible.
< 0It disables automatic offloading operation.

Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, the filesystem offloader does not work until the current segment is full.

You can configure the threshold using CLI tools, such as pulsar-admin.

Example

This example sets the filesystem offloader threshold to 10 MB using pulsar-admin.

  1. pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace
tip

For more information about the pulsar-admin namespaces set-offload-threshold options command, including flags, descriptions, default values, and shorthands, see here.

Run filesystem offloader manually

For individual topics, you can trigger the filesystem offloader manually using one of the following methods:

  • Use the REST endpoint.

  • Use CLI tools (such as pulsar-admin).

To manually trigger the filesystem offloader via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are offloaded to the filesystem until the threshold is no longer exceeded. Older segments are offloaded first.

Example

  • This example manually run the filesystem offloader using pulsar-admin.

    1. pulsar-admin topics offload --size-threshold 10M persistent://my-tenant/my-namespace/topic1
  1. **Output**
  2. ```
  3. Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
  4. ```
  5. ##### tip
  6. For more information about the `pulsar-admin topics offload options` command, including flags, descriptions, default values, and shorthands, see [here](https://pulsar.apache.org/docs/pulsar-admin#offload).
  • This example checks filesystem offloader status using pulsar-admin.

    1. pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
  1. **Output**
  2. ```
  3. Offload is currently running
  4. ```
  5. To wait for the filesystem to complete the job, add the `-w` flag.
  6. ```
  7. pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1
  8. ```
  9. **Output**
  10. ```
  11. Offload was a success
  12. ```
  13. If there is an error in the offloading operation, the error is propagated to the `pulsar-admin topics offload-status` command.
  14. ```
  15. pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
  16. ```
  17. **Output**
  18. ```
  19. Error in offload
  20. null
  21. Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=
  22. ```
  23. ##### tip
  24. For more information about the `pulsar-admin topics offload-status options` command, including flags, descriptions, default values, and shorthands, see [here](https://pulsar.apache.org/docs/pulsar-admin#offload-status).

Tutorial

This section provides step-by-step instructions on how to use the filesystem offloader to move data from Pulsar to Hadoop Distributed File System (HDFS) or Network File system (NFS).

  • HDFS
  • NFS

To move data from Pulsar to HDFS, follow these steps.

Step 1: Prepare the HDFS environment

This tutorial sets up a Hadoop single node cluster and uses Hadoop 3.2.1.

tip

For details about how to set up a Hadoop single node cluster, see here.

  1. Download and uncompress Hadoop 3.2.1.

    1. wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz
    2. tar -zxvf hadoop-3.2.1.tar.gz -C $HADOOP_HOME
  1. Configure Hadoop.

    1. # $HADOOP_HOME/etc/hadoop/core-site.xml
    2. <configuration>
    3. <property>
    4. <name>fs.defaultFS</name>
    5. <value>hdfs://localhost:9000</value>
    6. </property>
    7. </configuration>
    8. # $HADOOP_HOME/etc/hadoop/hdfs-site.xml
    9. <configuration>
    10. <property>
    11. <name>dfs.replication</name>
    12. <value>1</value>
    13. </property>
    14. </configuration>
  1. Set passphraseless ssh.

    1. # Now check that you can ssh to the localhost without a passphrase:
    2. $ ssh localhost
    3. # If you cannot ssh to localhost without a passphrase, execute the following commands
    4. $ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
    5. $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    6. $ chmod 0600 ~/.ssh/authorized_keys
  1. Start HDFS.

    1. # don't execute this command repeatedly, repeat execute will cauld the clusterId of the datanode is not consistent with namenode
    2. $HADOOP_HOME/bin/hadoop namenode -format
    3. $HADOOP_HOME/sbin/start-dfs.sh
  1. Navigate to the HDFS website.

    You can see the Overview page.

    Filesystem offloader - 图1

    1. At the top navigation bar, click Datanodes to check DataNode information.

      Filesystem offloader - 图2

    2. Click HTTP Address to get more detailed information about localhost:9866.

      As can be seen below, the size of Capacity Used is 4 KB, which is the initial value.

      Filesystem offloader - 图3

Step 2: Install the filesystem offloader

For details, see installation.

Step 3: Configure the filesystem offloader

As indicated in the configuration section, you need to configure some properties for the filesystem offloader driver before using it. This tutorial assumes that you have configured the filesystem offloader driver as below and run Pulsar in standalone mode.

Set the following configurations in the conf/standalone.conf file.

  1. managedLedgerOffloadDriver=filesystem
  2. fileSystemURI=hdfs://127.0.0.1:9000
  3. fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
note

For testing purposes, you can set the following two configurations to speed up ledger rollover, but it is not recommended that you set them in the production environment.

  1. managedLedgerMinLedgerRolloverTimeMinutes=1
  2. managedLedgerMaxEntriesPerLedger=100
note

In this section, it is assumed that you have enabled NFS service and set the shared path of your NFS service. In this section, /Users/test is used as the shared path of NFS service.

To offload data to NFS, follow these steps.

Step 1: Install the filesystem offloader

For details, see installation.

Step 2: Mont your NFS to your local filesystem

This example mounts mounts /Users/pulsar_nfs to /Users/test.

  1. mount -e 192.168.0.103:/Users/test/Users/pulsar_nfs

Step 3: Configure the filesystem offloader driver

As indicated in the configuration section, you need to configure some properties for the filesystem offloader driver before using it. This tutorial assumes that you have configured the filesystem offloader driver as below and run Pulsar in standalone mode.

  1. Set the following configurations in the conf/standalone.conf file.

    1. managedLedgerOffloadDriver=filesystem
    2. fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
  1. Modify the filesystem_offload_core_site.xml as follows.

    1. <property>
    2. <name>fs.defaultFS</name>
    3. <value>file:///</value>
    4. </property>
    5. <property>
    6. <name>hadoop.tmp.dir</name>
    7. <value>file:///Users/pulsar_nfs</value>
    8. </property>
    9. <property>
    10. <name>io.file.buffer.size</name>
    11. <value>4096</value>
    12. </property>
    13. <property>
    14. <name>io.seqfile.compress.blocksize</name>
    15. <value>1000000</value>
    16. </property>
    17. <property>
    18. <name>io.seqfile.compression.type</name>
    19. <value>BLOCK</value>
    20. </property>
    21. <property>
    22. <name>io.map.index.interval</name>
    23. <value>128</value>
    24. </property>

Step 4: Offload data from BookKeeper to filesystem

Execute the following commands in the repository where you download Pulsar tarball. For example, ~/path/to/apache-pulsar-2.5.1.

  1. Start Pulsar standalone.

    1. bin/pulsar standalone -a 127.0.0.1
  1. To ensure the data generated is not deleted immediately, it is recommended to set the retention policy, which can be either a size limit or a time limit. The larger value you set for the retention policy, the longer the data can be retained.

    1. bin/pulsar-admin namespaces set-retention public/default --size 100M --time 2d
  1. ##### tip
  2. For more information about the `pulsarctl namespaces set-retention options` command, including flags, descriptions, default values, and shorthands, see [here](https://docs.streamnative.io/pulsarctl/v2.7.0.6/#-em-set-retention-em-).
  1. Produce data using pulsar-client.

    1. bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
  1. The offloading operation starts after a ledger rollover is triggered. To ensure offload data successfully, it is recommended that you wait until several ledger rollovers are triggered. In this case, you might need to wait for a second. You can check the ledger status using pulsarctl.

    1. bin/pulsar-admin topics stats-internal public/default/fs-test
  1. **Output**
  2. The data of the ledger 696 is not offloaded.
  3. ```
  4. {
  5. "version": 1,
  6. "creationDate": "2020-06-16T21:46:25.807+08:00",
  7. "modificationDate": "2020-06-16T21:46:25.821+08:00",
  8. "ledgers": [
  9. {
  10. "ledgerId": 696,
  11. "isOffloaded": false
  12. }
  13. ],
  14. "cursors": {}
  15. }
  16. ```
  1. Wait a second and send more messages to the topic.

    1. bin/pulsar-client produce -m "Hello FileSystem Offloader" -n 1000 public/default/fs-test
  1. Check the ledger status using pulsarctl.

    1. bin/pulsar-admin topics stats-internal public/default/fs-test
  1. **Output**
  2. The ledger 696 is rollovered.
  3. ```
  4. {
  5. "version": 2,
  6. "creationDate": "2020-06-16T21:46:25.807+08:00",
  7. "modificationDate": "2020-06-16T21:48:52.288+08:00",
  8. "ledgers": [
  9. {
  10. "ledgerId": 696,
  11. "entries": 1001,
  12. "size": 81695,
  13. "isOffloaded": false
  14. },
  15. {
  16. "ledgerId": 697,
  17. "isOffloaded": false
  18. }
  19. ],
  20. "cursors": {}
  21. }
  22. ```
  1. Trigger the offloading operation manually using pulsarctl.

    1. bin/pulsar-admin topics offload -s 0 public/default/fs-test
  1. **Output**
  2. Data in ledgers before the ledge 697 is offloaded.
  3. ```
  4. # offload info, the ledgers before 697 will be offloaded
  5. Offload triggered for persistent://public/default/fs-test3 for messages before 697:0:-1
  6. ```
  1. Check the ledger status using pulsarctl.

    1. bin/pulsar-admin topics stats-internal public/default/fs-test
  1. **Output**
  2. The data of the ledger 696 is offloaded.
  3. ```
  4. {
  5. "version": 4,
  6. "creationDate": "2020-06-16T21:46:25.807+08:00",
  7. "modificationDate": "2020-06-16T21:52:13.25+08:00",
  8. "ledgers": [
  9. {
  10. "ledgerId": 696,
  11. "entries": 1001,
  12. "size": 81695,
  13. "isOffloaded": true
  14. },
  15. {
  16. "ledgerId": 697,
  17. "isOffloaded": false
  18. }
  19. ],
  20. "cursors": {}
  21. }
  22. ```
  23. And the **Capacity Used** is changed from 4 KB to 116.46 KB.
  24. ![](/projects/apache-pulsar-2.10.0-en/7d566d9ab4b73ea9f23e0398d8d1636d.png)