Message retention and expiry
Pulsar brokers 负责处理通过 Pulsar 的消息,包括消息的持久存储。 默认情况下,对于每个主题,broker 会至少保留一个 backlog 消息。 backlog 是特定订阅的未确认的消息的集合。 每个主题可以有多个订阅者,所以每个主题可以有多个 backlog。
因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。
(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)
在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:
- 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
- 可以通过设置 time to live(TTL),设置消息在指定的时间内不被确认的话,自动确认。
你可以通过 Pulsar admin 接口 在命名空间(租户、集群或整个
集群) 级别管理消息保留策略和TTL。
消息保留和 TTL 有以下两点不同
- 消息保留: 保存数据至少X小时(即使消息已被确认消费)
- 生存时间(TTL):一段时间后丢弃数据(通过自动确认)
大多数应用程序最多只想使用其中的一个。
保留策略
默认情况下,当一条消息到达 broker 后,它会被一直保存到所有的订阅者都成功消费它,此时它才会被删除。 You can override this behavior and retain even messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention policies are either a size limit or a time limit.
如果你只打算使用 Reader 接口,消息保留策略会特别有用。 因为 Reader 接口不使用消息确认机制,消息将永远不会存在 backlog 中。 所以大多数实际的 Reader-only 案例都需要配置消息保留。
如果设置根据消息大小保留,比如保留10GB,那么命名空间内每个主题的消息,包括已经确认的消息,都会被保留到达到主题的大小限制为止。如果设置根据时间保留,比如保留一天,那么命名空间内每个主题的消息,包括已经确认的消息,都会被保留24小时。 消息保留策略会对没有订阅的主题的所有消息生效,或者对有订阅的主题的已经被确认的消息生效。 消息保留策略不影响订阅主题上的未确认消息 — 这些消息由 backlog 配额控制(见下文)。
当超过保留限额时,最老的消息会被标记删除,直到保留的消息在指定的限制范围之内。
It is also possible to set unlimited retention time or size by setting -1
for either time or size retention.
默认情况
下面的两个配置参数可用于设置实例范围的消息保留默认值:defaultRetentionTimeInMinutes=0
和defaultRetentionSizeInMB=0
。
这两个参数都在 broker.conf
配置文件中。
设置保留策略
You can set a retention policy for a namespace by specifying the namespace as well as both a size limit and a time limit.
pulsar-admin
使用set-retention
子命令并指定命名空间,使用-s
/--size
参数指定大小限制,使用-t
/--time
参数指定时间限制。
示例
为 my-tenant/my-ns
命名空间设置10的大小限制和3小时的时间限制:
$ pulsar-admin namespaces set-retention my-tenant/my-ns \
--size 10G \
--time 3h
设置保留大小限制,但没有时间限制:
$ pulsar-admin namespaces set-retention my-tenant/my-ns \
--size 1T \
--time -1
设置保留无限大小和时间:
$ pulsar-admin namespaces set-retention my-tenant/my-ns \
--size -1 \
--time -1
REST API
POST /admin/v2/namespaces/:tenant/:namespace/retention
Java
int retentionTime = 10; // 10 minutes
int retentionSize = 500; // 500 megabytes
RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);
admin.namespaces().setRetention(namespace, policies);
获取保留策略
You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes
and retentionSizeInMB
.
pulsar-admin
使用 get-retention
子命令并指定命名空间。
示例
$ pulsar-admin namespaces get-retention my-tenant/my-ns
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 0
}
REST API
GET /admin/v2/namespaces/:tenant/:namespace/retention
Java
admin.namespaces().getRetention(namespace);
Backlog quotas
Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.
You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:
待办:如何对每个主题或每个 backlog 进行扩展?
- an allowable size threshold for each topic in the namespace
- a retention policy that determines which action the broker takes if the threshold is exceeded.
可以使用以下保留策略:
策略 | 触发的操作 |
---|---|
producer_request_hold | Broker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储 |
producer_exception | Broker 会与客户端断开连接并抛出异常 |
consumer_backlog_eviction | Broker 将开始丢弃backlog的消息 |
注意保留策略类型之间的区别
你可能已经注意到,Pulsar 中关于“保留策略”有两种定义,一种适用于已确认消息的持久存储,一种适用于 backlog。
Backlog quotas are handled at the namespace level. They can be managed via:
设置大小阈值和 backlog 保留策略
通过指定命名空间的大小限制和策略,为命名空间中的所有主题设置大小阈值和 backlog 保留策略。
pulsar-admin
使用set-backlog-quota
子命令,并使用 -l
/--limit
参数指定命名空间大小限制 ,以及使用 <code>-p
/--policy
参数指定保留策略。
示例
$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
--limit 2G \
--policy producer_request_hold
REST API
POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota
Java
long sizeLimit = 2147483648L;
BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
admin.namespaces().setBacklogQuota(namespace, quota);
获取 backlog 大小阈值和 backlog 保留策略
可以查看已对命名空间应用的大小阈值和 backlog 保留策略。
pulsar-admin
Use the get-backlog-quotas
subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
{
"destination_storage": {
"limit" : 2147483648,
"policy" : "producer_request_hold"
}
}
REST API
GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap
Java
Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
admin.namespaces().getBacklogQuotas(namespace);
移除 backlog quotas
pulsar-admin
Use the remove-backlog-quota
subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota
Java
admin.namespaces().removeBacklogQuota(namespace);
清除 backlog
pulsar-admin
使用 clear-backlog
子命令。
示例
$ pulsar-admin namespaces clear-backlog my-tenant/my-ns
By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f
/--force
flag.
生存时间 (TTL)
默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。
为名称空间设置生存时间(TTL)
pulsar-admin
使用 set-message-ttl
子命令并指定命名空间和TTL(以秒为单位,使用-ttl
/--messageTTL
参数指定)。
示例
$ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
--messageTTL 120 # TTL of 2 minutes
REST API
POST /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);
获取命名空间的生存时间(TTL) 配置
pulsar-admin
使用 get-message-ttl
子命令并指定命名空间。
示例
$ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
60
REST API
GET /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().getNamespaceMessageTTL(namespace)
Remove the TTL configuration for a namespace
pulsar-admin
Use the remove-message-ttl
subcommand and specify a namespace.
示例
$ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().removeNamespaceMessageTTL(namespace)
Delete messages from namespaces
If you do not have any retention period and that you never have much of a backlog, the upper limit for retaining messages, which are acknowledged, equals to the Pulsar segment rollover period + entry log rollover period + (garbage collection interval * garbage collection ratios).
Segment rollover period: basically, the segment rollover period is how often a new segment is created. Once a new segment is created, the old segment will be deleted. By default, this happens either when you have written 50,000 entries (messages) or have waited 240 minutes. You can tune this in your broker.
Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. In order for a ledger that has been deleted, the entry log must all be rolled over. The entry log rollover period is configurable, but is purely based on the entry log size. For details, see here. Once the entry log is rolled over, the entry log can be garbage collected.
Garbage collection interval: because entry logs have interleaved ledgers, to free up space, the entry logs need to be rewritten. The garbage collection interval is how often BookKeeper performs garbage collection. which is related to minor compaction and major compaction of entry logs. For details, see here.