Pulsar 的 主题压缩 特性让你能够通过从主题中删减那些老的、“模糊”的条目,以此来达到 压缩的效果,这样在遍历主题历史记录时能有更快的表现(哪些消息被视为模糊/过期/无关,将取决于你的使用场景)。

如要使用压缩:

  • 你需要给定消息的 key ,因为 Pulsar 的主题压缩是 基于 key 的 (也就是说,消息是根据它们的 key 进行压缩的)。 For a stock ticker use case, the stock symbol—-e.g. AAPL or GOOG-—could serve as the key (more on this below). 没有 key 的消息会被压缩过程单独留出。
  • Compaction can be configured to run automatically, or you can manually trigger compaction using the Pulsar administrative API.
  • Your consumers must be configured to read from compacted topics (Java consumers, for example, have a readCompacted setting that must be set to true). 如果没这么配置,消费者将仍能从非压缩主题中进行读取。

压缩只能在带 key 的消息上生效(如股票报价器示例中的股票代码就是每条消息的 key )。 因此,有无 key 可以看作能否采用压缩的标志。 那些没有 key 的消息会直接被压缩过程无视掉。

When should I use compacted topics? {#when}

如果要说主题能从压缩中得到什么好处,一个很好的示例就是股票报价器主题,因为消费者们能从中获取到指定股票的最新消息。 Imagine a scneario in which messages carrying stock value data use the stock symbol as the key (GOOG, AAPL, TWTR, etc.). 压缩这个主题将为消费者在这个主题上提供两个选项:

  • 如果他们需要访问“历史”值(主题的所有信息),他们可以从“原始”非压缩主题读取。
  • 如果他们只想看到最新消息,他们就可以从压缩主题中读取。

因此,如果你在使用一个叫做 stock-values(股票值) 的 Pulsar 主题,即便一些消费者能访问主题中的所有消息(也许因为他们需要对过去几个小时的变化做计算),他们也会习惯性地使用实时股票报价器来仅仅查看压缩主题(从而不会强制处理那些过期的消息)。 Which variant of the topic any given consumer pulls messages from is determined by the consumer’s configuration.

在 Pulsar 中,压缩的好处之一是你无需强制在压缩与非压缩主题之间作出选择, 因为压缩过程会保留原来的主题,只是在此基础上增加了另一个选项。 换句话说,你可以在主题上使用压缩,而那些需要访问非压缩主题的消费者并不会受到负面影响。

Configuring compaction to run automatically {#automatic}

租户管理员可以配置命名空间级别的压缩策略。 该策略会具体规定主题积压到什么程度才会触发压缩。

比如,当积压达到 100MB 时触发压缩:

  1. $ bin/pulsar-admin namespaces set-compaction-threshold \
  2. --threshold 100M my-tenant/my-namespace

为命名空间配置的压缩阈值将适用于命名空间中的所有主题。

Triggering compaction manually {#trigger}

为了在主题上执行压缩,你需要在 pulsar-admin 命令行工具使用 topics compact 命令。 Here’s an example:

  1. $ bin/pulsar-admin topics compact \
  2. persistent://my-tenant/my-namespace/my-topic

pulsar-admin 工具通过 Pulsar REST API 来执行压缩。 如果要在专有的进程中执行压缩,比如 通过 REST API,你可以使用 pulsar compact-topic 命令。 Here’s an example:

  1. $ bin/pulsar compact-topic \
  2. --topic persistent://my-tenant-namespace/my-topic

当您想要避免干扰 broker 的性能时,建议在单独的过程中运行压缩。 Broker 的性能应该只会在一个场景下被影响:在拥有庞大 keyspace 的主题(比如主题上有许多的 key)上执行压缩。 压缩过程的第一阶段会为主题中的每个 key 保留一份拷贝,所以当 key 增加时会加大内存压力。 使用 pulsar-admin topics compact 命令来通过 REST API 执行压缩在绝大多数情况下都不会出现问题,使用 pulsar compact-topic 应该相应地被视为一种边缘情况。

pulsar compact-topic 命令会直接与 ZooKeeper 进行通信。 不过,为了与 ZooKeeper 建立通信, pulsar 命令行工具需要有一个可用的 broker 配置。 你可以在 conf/broker.conf 提供一个正确的配置,或在配置中指定一个非默认路径:

  1. $ bin/pulsar compact-topic \
  2. --broker-conf /path/to/broker.conf \
  3. --topic persistent://my-tenant/my-namespace/my-topic
  4. # If the configuration is in conf/broker.conf
  5. $ bin/pulsar compact-topic \
  6. --topic persistent://my-tenant/my-namespace/my-topic

我该在什么时候触发压缩?

How often you trigger compaction will vary widely based on the use case. 如果你想要一个压缩主题有极快的读取速度,那么你应该相当频繁地执行压缩。

Consumer configuration {#config}

Pulsar 消费者和读者需要被配置为从压缩主题中读取消息。下一节会展示如何为 Pulsar 客户端启用压缩主题读取。

Java only

Currently, only Java clients can consume messages from compacted topics.

Java

要使用 Java 客户端从压缩主题中读取,readCompacted 参数需要被设置为 true。这有一个压缩主题的消费者示例:

  1. Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
  2. .topic("some-compacted-topic")
  3. .readCompacted(true)
  4. .subscribe();

如上所述,Pulsar 中的消息压缩是 基于每个 key 运作的。 这意味着您在压缩主题上生成的消息需要有 key(key 的内容将取决于你的用例)。 没有 key 的消息会被压缩过程单独留出。 下面是一个带 key 的 Pulsar 消息示例:

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageBuilder;
  3. Message<byte[]> msg = MessageBuilder.create()
  4. .setContent(someByteArray)
  5. .setKey("some-key")
  6. .build();

下面的示例展示了一个带 key 的消息在 Pulsar 压缩主题上被生产的过程。

  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageBuilder;
  3. import org.apache.pulsar.client.api.Producer;
  4. import org.apache.pulsar.client.api.PulsarClient;
  5. PulsarClient client = PulsarClient.builder()
  6. .serviceUrl("pulsar://localhost:6650")
  7. .build();
  8. Producer<byte[]> compactedTopicProducer = client.newProducer()
  9. .topic("some-compacted-topic")
  10. .create();
  11. Message<byte[]> msg = MessageBuilder.create()
  12. .setContent(someByteArray)
  13. .setKey("some-key")
  14. .build();
  15. compactedTopicProducer.send(msg);