Pulsar 的 主题压缩 特性让你能够通过从主题中删减那些老的、“模糊”的条目,以此来达到 压缩的效果,这样在遍历主题历史记录时能有更快的表现(哪些消息被视为模糊/过期/无关,将取决于你的使用场景)。
如要使用压缩:
- 你需要给定消息的 key ,因为 Pulsar 的主题压缩是 基于 key 的 (也就是说,消息是根据它们的 key 进行压缩的)。 For a stock ticker use case, the stock symbol—-e.g.
AAPL
orGOOG
-—could serve as the key (more on this below). 没有 key 的消息会被压缩过程单独留出。 - Compaction can be configured to run automatically, or you can manually trigger compaction using the Pulsar administrative API.
- Your consumers must be configured to read from compacted topics (Java consumers, for example, have a
readCompacted
setting that must be set totrue
). 如果没这么配置,消费者将仍能从非压缩主题中进行读取。
压缩只能在带 key 的消息上生效(如股票报价器示例中的股票代码就是每条消息的 key )。 因此,有无 key 可以看作能否采用压缩的标志。 那些没有 key 的消息会直接被压缩过程无视掉。
When should I use compacted topics? {#when}
如果要说主题能从压缩中得到什么好处,一个很好的示例就是股票报价器主题,因为消费者们能从中获取到指定股票的最新消息。 Imagine a scneario in which messages carrying stock value data use the stock symbol as the key (GOOG
, AAPL
, TWTR
, etc.). 压缩这个主题将为消费者在这个主题上提供两个选项:
- 如果他们需要访问“历史”值(主题的所有信息),他们可以从“原始”非压缩主题读取。
- 如果他们只想看到最新消息,他们就可以从压缩主题中读取。
因此,如果你在使用一个叫做 stock-values(股票值)
的 Pulsar 主题,即便一些消费者能访问主题中的所有消息(也许因为他们需要对过去几个小时的变化做计算),他们也会习惯性地使用实时股票报价器来仅仅查看压缩主题(从而不会强制处理那些过期的消息)。 Which variant of the topic any given consumer pulls messages from is determined by the consumer’s configuration.
在 Pulsar 中,压缩的好处之一是你无需强制在压缩与非压缩主题之间作出选择, 因为压缩过程会保留原来的主题,只是在此基础上增加了另一个选项。 换句话说,你可以在主题上使用压缩,而那些需要访问非压缩主题的消费者并不会受到负面影响。
Configuring compaction to run automatically {#automatic}
租户管理员可以配置命名空间级别的压缩策略。 该策略会具体规定主题积压到什么程度才会触发压缩。
比如,当积压达到 100MB 时触发压缩:
$ bin/pulsar-admin namespaces set-compaction-threshold \
--threshold 100M my-tenant/my-namespace
为命名空间配置的压缩阈值将适用于命名空间中的所有主题。
Triggering compaction manually {#trigger}
为了在主题上执行压缩,你需要在 pulsar-admin
命令行工具使用 topics compact
命令。 Here’s an example:
$ bin/pulsar-admin topics compact \
persistent://my-tenant/my-namespace/my-topic
pulsar-admin
工具通过 Pulsar REST API 来执行压缩。 如果要在专有的进程中执行压缩,比如 不 通过 REST API,你可以使用 pulsar compact-topic
命令。 Here’s an example:
$ bin/pulsar compact-topic \
--topic persistent://my-tenant-namespace/my-topic
当您想要避免干扰 broker 的性能时,建议在单独的过程中运行压缩。 Broker 的性能应该只会在一个场景下被影响:在拥有庞大 keyspace 的主题(比如主题上有许多的 key)上执行压缩。 压缩过程的第一阶段会为主题中的每个 key 保留一份拷贝,所以当 key 增加时会加大内存压力。 使用
pulsar-admin topics compact
命令来通过 REST API 执行压缩在绝大多数情况下都不会出现问题,使用pulsar compact-topic
应该相应地被视为一种边缘情况。
pulsar compact-topic
命令会直接与 ZooKeeper 进行通信。 不过,为了与 ZooKeeper 建立通信, pulsar
命令行工具需要有一个可用的 broker 配置。 你可以在 conf/broker.conf
提供一个正确的配置,或在配置中指定一个非默认路径:
$ bin/pulsar compact-topic \
--broker-conf /path/to/broker.conf \
--topic persistent://my-tenant/my-namespace/my-topic
# If the configuration is in conf/broker.conf
$ bin/pulsar compact-topic \
--topic persistent://my-tenant/my-namespace/my-topic
我该在什么时候触发压缩?
How often you trigger compaction will vary widely based on the use case. 如果你想要一个压缩主题有极快的读取速度,那么你应该相当频繁地执行压缩。
Consumer configuration {#config}
Pulsar 消费者和读者需要被配置为从压缩主题中读取消息。下一节会展示如何为 Pulsar 客户端启用压缩主题读取。
Java only
Currently, only Java clients can consume messages from compacted topics.
Java
要使用 Java 客户端从压缩主题中读取,readCompacted
参数需要被设置为 true
。这有一个压缩主题的消费者示例:
Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
.topic("some-compacted-topic")
.readCompacted(true)
.subscribe();
如上所述,Pulsar 中的消息压缩是 基于每个 key 运作的。 这意味着您在压缩主题上生成的消息需要有 key(key 的内容将取决于你的用例)。 没有 key 的消息会被压缩过程单独留出。 下面是一个带 key 的 Pulsar 消息示例:
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
Message<byte[]> msg = MessageBuilder.create()
.setContent(someByteArray)
.setKey("some-key")
.build();
下面的示例展示了一个带 key 的消息在 Pulsar 压缩主题上被生产的过程。
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> compactedTopicProducer = client.newProducer()
.topic("some-compacted-topic")
.create();
Message<byte[]> msg = MessageBuilder.create()
.setContent(someByteArray)
.setKey("some-key")
.build();
compactedTopicProducer.send(msg);