Message retention and expiry
Pulsar brokers 负责处理通过 Pulsar 的消息,包括消息的持久存储。 默认情况下,对于每个主题,broker 会至少保留一个 backlog 消息。 backlog 是特定订阅的未确认的消息的集合。 每个主题可以有多个订阅者,所以每个主题可以有多个 backlog。
因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。
(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)
在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:
- 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
- 可以通过设置 time to live(TTL),设置消息在指定的时间内不被确认的话,自动确认。
你可以通过 Pulsar admin 接口 在命名空间(租户、集群或整个
集群) 级别管理消息保留策略和TTL。
消息保留和 TTL 有以下两点不同
- 消息保留: 保存数据至少X小时(即使消息已被确认消费)
- 生存时间(TTL):一段时间后丢弃数据(通过自动确认)
大多数应用程序最多只想使用其中的一个。
保留策略
By default, when a Pulsar message arrives at a broker, the message is stored until it has been acknowledged on all subscriptions, at which point it is marked for deletion. You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention is based on both a size limit and a time limit.
Retention policies are useful when you use the Reader interface. The Reader interface does not use acknowledgements, and messages do not exist within backlogs. It is required to configure retention for Reader-only use cases.
When you set a retention policy on topics in a namespace, you must set both a size limit and a time limit. You can refer to the following table to set retention policies in pulsar-admin
and Java.
Time limit | Size limit | Message retention |
---|---|---|
-1 | -1 | Infinite retention |
-1 | >0 | Based on the size limit |
>0 | -1 | Based on the time limit |
0 | 0 | Disable message retention (by default) |
0 | >0 | Invalid |
>0 | 0 | Invalid |
>0 | >0 | Acknowledged messages or messages with no active subscription will not be retained when either time or size reaches the limit. |
The retention settings apply to all messages on topics that do not have any subscriptions, or to messages that have been acknowledged by all subscriptions. The retention policy settings do not affect unacknowledged messages on topics with subscriptions. The unacknowledged messages are controlled by the backlog quota.
When a retention limit on a topic is exceeded, the oldest message is marked for deletion until the set of retained messages falls within the specified limits again.
默认情况
You can set message retention at instance level with the following two parameters: defaultRetentionTimeInMinutes
and defaultRetentionSizeInMB
. Both parameters are set to 0
by default.
For more information of the two parameters, refer to the broker.conf
configuration file.
设置保留策略
You can set a retention policy for a namespace by specifying the namespace, a size limit and a time limit in pulsar-admin
, REST API and Java.
pulsar-admin
REST API
Java
You can use the set-retention
subcommand and specify a namespace, a size limit using the -s
/--size
flag, and a time limit using the -t
/--time
flag.
In the following example, the size limit is set to 10 GB and the time limit is set to 3 hours for each topic within the my-tenant/my-ns
namespace.
- When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained.
- After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 10G \ --time 3h
In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 1T \ --time -1
In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time 3h
To achieve infinite retention, set both values to -1
.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time -1
To disable the retention policy, set both values to 0
.
$ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 0 \ --time 0
POST /admin/v2/namespaces/:tenant/:namespace/retention
Note
To disable the retention policy, you need to set both the size and time limit to0
. Set either size or time limit to0
is invalid.
int retentionTime = 10; // 10 minutesint retentionSize = 500; // 500 megabytesRetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);admin.namespaces().setRetention(namespace, policies);
获取保留策略
You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes
and retentionSizeInMB
.
pulsar-admin
使用 get-retention
子命令并指定命名空间。
示例
$ pulsar-admin namespaces get-retention my-tenant/my-ns
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 500
}
REST API
GET /admin/v2/namespaces/:tenant/:namespace/retention
Java
admin.namespaces().getRetention(namespace);
Backlog quotas
Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.
You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:
待办:如何对每个主题或每个 backlog 进行扩展?
- an allowable size threshold for each topic in the namespace
- a retention policy that determines which action the broker takes if the threshold is exceeded.
可以使用以下保留策略:
策略 | 触发的操作 |
---|---|
producer_request_hold | Broker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储 |
producer_exception | Broker 会与客户端断开连接并抛出异常 |
consumer_backlog_eviction | Broker 将开始丢弃backlog的消息 |
注意保留策略类型之间的区别
你可能已经注意到,Pulsar 中关于“保留策略”有两种定义,一种适用于已确认消息的持久存储,一种适用于 backlog。
Backlog quotas are handled at the namespace level. They can be managed via:
设置大小阈值和 backlog 保留策略
通过指定命名空间的大小限制和策略,为命名空间中的所有主题设置大小阈值和 backlog 保留策略。
pulsar-admin
使用set-backlog-quota
子命令,并使用 -l
/--limit
参数指定命名空间大小限制 ,以及使用 <code>-p
/--policy
参数指定保留策略。
示例
$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
--limit 2G \
--policy producer_request_hold
REST API
POST /admin/v2/namespaces/:tenant/:namespace/backlogQuota
Java
long sizeLimit = 2147483648L;
BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
admin.namespaces().setBacklogQuota(namespace, quota);
获取 backlog 大小阈值和 backlog 保留策略
可以查看已对命名空间应用的大小阈值和 backlog 保留策略。
pulsar-admin
Use the get-backlog-quotas
subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
{
"destination_storage": {
"limit" : 2147483648,
"policy" : "producer_request_hold"
}
}
REST API
GET /admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap
Java
Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
admin.namespaces().getBacklogQuotas(namespace);
移除 backlog quotas
pulsar-admin
Use the remove-backlog-quota
subcommand and specify a namespace. 下面是一个示例:
$ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/backlogQuota
Java
admin.namespaces().removeBacklogQuota(namespace);
清除 backlog
pulsar-admin
使用 clear-backlog
子命令。
示例
$ pulsar-admin namespaces clear-backlog my-tenant/my-ns
By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f
/--force
flag.
生存时间 (TTL)
默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。
为名称空间设置生存时间(TTL)
pulsar-admin
使用 set-message-ttl
子命令并指定命名空间和TTL(以秒为单位,使用-ttl
/--messageTTL
参数指定)。
示例
$ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
--messageTTL 120 # TTL of 2 minutes
REST API
POST /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);
获取命名空间的生存时间(TTL) 配置
pulsar-admin
使用 get-message-ttl
子命令并指定命名空间。
示例
$ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
60
REST API
GET /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().getNamespaceMessageTTL(namespace)
Remove the TTL configuration for a namespace
pulsar-admin
Use the remove-message-ttl
subcommand and specify a namespace.
示例
$ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns
REST API
DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL
Java
admin.namespaces().removeNamespaceMessageTTL(namespace)
Delete messages from namespaces
If you do not have any retention period and that you never have much of a backlog, the upper limit for retaining messages, which are acknowledged, equals to the Pulsar segment rollover period + entry log rollover period + (garbage collection interval * garbage collection ratios).
Segment rollover period: basically, the segment rollover period is how often a new segment is created. Once a new segment is created, the old segment will be deleted. By default, this happens either when you have written 50,000 entries (messages) or have waited 240 minutes. You can tune this in your broker.
Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. In order for a ledger that has been deleted, the entry log must all be rolled over. The entry log rollover period is configurable, but is purely based on the entry log size. For details, see here. Once the entry log is rolled over, the entry log can be garbage collected.
Garbage collection interval: because entry logs have interleaved ledgers, to free up space, the entry logs need to be rewritten. The garbage collection interval is how often BookKeeper performs garbage collection. which is related to minor compaction and major compaction of entry logs. For details, see here.