消息数据高度可扩展的持久存储,是Pulsar构建的主要目标。 Pulsar的topic让你可以持久存储所有你所需要的未被确认消息,同时保留了消息的顺序。 主题上生产的所有未被确认/未被处理的消息,Pulsar会默认存储。 在很多Pulsar的使用案例中,在topic累积大量的未被确认的消息是有必要的。但对于Pulsar的consumer来说,在完整的消息log中回退,将变得非常耗时。

更多topic压缩实践的指南,请参考 Topic compaction cookbook

某些情况下,consumer并不需要完整的topic日志。 他们可能只需要几个值来构造一个更 “浅” 的日志图像, 也许仅仅只是最新的值。 对于这种应用场景,Pulsar提供了 topic压缩. 当你在topic上执行压缩,Pulsar会遍历topic的backlog然后把遥远模糊已经有了更新的消息移除。例如,它遍历一个以key为基础的topic,只留下关联到key上最新的消息。

Pulsar的topic压缩特性:

  • 允许通过主题日志更快地 “后退”
  • 仅适用于 持久topic
  • 当backlog达到一定大小时,可以被自动出发,或者通过命令行手动出发。请参见Topic compaction cookbook
  • 在概念上和操作上与 retention和expiry 是不同的。 但是,在topic压缩中,还是尊重retention。 如果retention已经从topic的backlog中移除了消息,那么此条消息在压缩的topic账簿上也是无法被读取的。

Topic压缩示例:股票报价机

Pulsar topic压缩的使用例子,可以看一下股票报价机topic。 在股票报价机topic中,每条消息都有带有时间戳的股票买入价格(包含代表股票符号的消息key,例如AAPL或者GOOG)。 可能你感兴趣的只是股票报价机中最新的股票价格,而对历史数据并不感兴趣(即你不需要构建topic每个key下消息序列的完整图像)。 压缩在这种场景下非常的方便,因为它使得用户不需回退到模糊的消息上。

Topic压缩的工作原理

通过命令行触发topic压缩,Pulsar将会从头到尾迭代整个topic。 对于它碰到的每个key,压缩程序将会只保留这个key最近的事件。

之后,broker将会创建一个新的BookKeeper ledger 然后开始对topic的每条消息进行第二次迭代。 对于每条消息,如果key匹配到它的最新事件,key的数据内容,消息ID,元数据将会被写入最新创建的ledger。 如果key并没有匹配到最新的消息,消息将被跳过。 如果给定的消息,负载是空的,它将被跳过并且视为删除(类似key-value数据库中的 tombstones)概念); 在本topic第二次迭代结束时,新创建的BookKeeper ledger将被关闭,并将两个内容写入元数据 :BookKeeper ledger的ID及最新被压缩的消息的ID(这被称为topic的压缩层位)。 写入元数据后,压缩就完成了。

初始化压缩操作完成后,将来任何对压缩层位及压缩backlog的修改,都会通知给拥有该topic的Pulsar broker。 当下列更改发生时:

  • 启用读取压缩功能的客户端(consumer和reader),将会尝试从topic中读取消息,或者:
    • 像从正常的主题那样读取(如果消息的ID大于等于压缩层位),或
    • 从压缩层位的开始读取(如果消息ID小于压缩层位)