Message deduplication
When Message deduplication is enabled, it ensures that each message produced on Pulsar topics is persisted to disk only once, even if the message is produced more than once. 消息去重是在服务端自动执行的。
Pulsar中启用消息去重,你必须同时配置 Pulsar broker 和客户端。
运作模式
You can enable or disable message deduplication at the namespace level or the topic level. By default, it is disabled on all namespaces or topics. You can enable it in the following ways:
- 在 broker 级别启用所有命名空间/主题的去重。
- 通过
pulsar-admin namespaces
命令设置主题命名空间启用去重。 - 通过
pulsar-admin topics
命令设置命名空间启用去重。
配置消息去重
你可以通过broker.conf
配置 Pulsar 启用消息去重。 下面是和去重有关的配置项。
参数名 | Description | 默认值 |
---|---|---|
brokerDeduplicationEnabled | 设置 Pulsar broker 级别消息去重的默认值。 如果设置为true , 所有的命名空间/主题都支持消息去重。 如果设置为false ,你必须在命名空间/主题级别启用/禁用去重。 | false |
brokerDeduplicationMaxNumberOfProducers | 为了去重目的而存储信息的最大生产者数量。 | 10000 |
brokerDeduplicationEntriesInterval | 去重快照生成后,允许存储的去重信息数量。 更大的时间间隔会减少快照的数量,尽管会延长主题的恢复时长(重播快照内容所需要的时间)。 | 1000 |
brokerDeduplicationProducerInactivityTimeoutMinutes | 停止活动的时间(分钟级别),当生产者断开连接超过这个时间,就丢弃和该生产者有关的去重信息。 | 360 (6小时) |
设置 broker 级别的默认值
By default, message deduplication is disabled on all Pulsar namespaces/topics. 启用所有命名空间/主题的去重功能,需设置配置项brokerDeduplicationEnabled
为true
并重启 broker。
即使设置了brokerDeduplicationEnabled
配置项,通过 Pulsar admin CLI 设置的值也会覆盖 broker 的配置。
启用消息去重
尽管默认情况下 broker 级别的消息去重是禁止的,你能够使用pulsar-admin namespaces set-deduplication
或者pulsar-admin topics set-deduplication
命令去启用指定命名空间或主题的去重功能。 你能通过使用--enable
/-e
标记并指定命名空间/主题启用去重。
下面示例显示如何在命名空间级别启用去重。
$ bin/pulsar-admin namespaces set-deduplication \
public/default \
--enable # or just -e
禁用消息去重
如果broker 级别启用了消息去重,你能够使用pulsar-admin namespace set-deduplication
或者pulsar-admin topics set-deduplication
命令禁用去重功能。 你能通过使用--disable
/-d
标记并指定命名空间/主题禁用去重。
下面示例显示如何在命名空间级别禁用去重。
$ bin/pulsar-admin namespaces set-deduplication \
public/default \
--disable # or just -d
Pulsar 客户端
If you enable message deduplication in Pulsar brokers, you need complete the following tasks for your client producers:
- 指定生产者的名称。
- 设置消息超时为
0
(即无超时)。
The instructions for Java, Python, and C++ clients are different.
Java clients
Python clients
C++ clients
To enable message deduplication on a Java producer, set the producer name using the producerName
setter, and set the timeout to 0
using the sendTimeout
setter.
import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import java.util.concurrent.TimeUnit;PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Producer producer = pulsarClient.newProducer() .producerName("producer-1") .topic("persistent://public/default/topic-1") .sendTimeout(0, TimeUnit.SECONDS) .create();
To enable message deduplication on a Python producer, set the producer name using producer_name
, and set the timeout to 0
using send_timeout_millis
.
import pulsarclient = pulsar.Client("pulsar://localhost:6650")producer = client.create_producer( "persistent://public/default/topic-1", producer_name="producer-1", send_timeout_millis=0)
To enable message deduplication on a C++ producer, set the producer name using producer_name
, and set the timeout to 0
using send_timeout_millis
.
#include <pulsar/Client.h>std::string serviceUrl = "pulsar://localhost:6650";std::string topic = "persistent://some-tenant/ns1/topic-1";std::string producerName = "producer-1";Client client(serviceUrl);ProducerConfiguration producerConfig;producerConfig.setSendTimeout(0);producerConfig.setProducerName(producerName);Producer producer;Result result = client.createProducer(topic, producerConfig, producer);