How to use transactions?
事务API
事务功能主要是服务器端和协议级功能。 您可以通过 事务API 使用事务功能,该 API 在 ** Pulsar 2.8.0 或更高版本**中可用。
要使用事务 API,您不需要在 Pulsar 客户端中进行任何额外设置。 默认情况下,事务是被禁用的。
目前,事务API仅适用于 Java客户端。 未来版本中将添加对其他语言客户端的支持。
快速开始
本节提供了如何在Java客户端中使用事务API发送和接收消息的示例
启动Pulsar 2.8.0或更高版本。
开启事务。
更改
broker.conf
文件中的配置。transactionCoordinatorEnabled=true
如果要在事务中启用批处理消息,请按照以下步骤操作。
在
broker.conf
或standalone.conf
文件中将acknowledgmentAtBatchIndexLevelEnabled
设置为true
。acknowledgmentAtBatchIndexLevelEnabled=true
初始化事务协调器元数据。
事务协调器可以利用分区主题的优势(例如负载平衡)。
输入
bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone
输出
Transaction coordinator metadata setup success
初始化一个Pulsar客户端。
PulsarClient client = PulsarClient.builder()
.serviceUrl(“pulsar://localhost:6650”)
.enableTransaction(true)
.build();
现在您可以开始使用事务API来发送和接收消息。 下面是一个用 Java 编写的 consume-process-produce
应用程序示例。
让我们逐步来看看这个例子。
步骤 | 说明 |
1. 启动一个事务 | 应用程序通过调用PulsarClient.newTransaction打开一个新事务。 它将事务超时指定为1分钟。 如果事务没有在1分钟内提交,事务将自动中止。 |
2. 接收来自主题的消息 | 应用程序创建两个普通消费者,分别接收来自主题 input-topic-1 和 input-topic-2 的消息。 如果要在事务中启用批量消息确认,请在消费者构建器(Consumer Builder)中调用enableBatchIndexAcknowledgment(true) 方法。 有关示例,请参阅此表下方的 [1]。 |
3. 将消息发布到带有事务的主题。 | 应用程序创建两个生产者,分别向输出主题output-topic-1和output-topic-2生成结果消息。 应用程序应用处理逻辑并生成两个输出消息。 应用程序通过 Producer.newMessage(Transaction) 将这两个输出消息作为第一步中打开的事务的一部分发送。 |
4. 确认带有事务的消息。 | 在同一事务中,应用程序确认两个输入消息。 |
5. 提交该事务。 | 应用程序通过对打开的事务调用Transaction.commit()来提交事务。 提交操作确保两个输入消息被标记为已确认,并且两个输出消息成功写入输出主题。 提示:您也可以调用 Transaction.abort() 来中止打开的事务。 |
[1] 在消费者构建器中的事务中启用批处理消息确认的示例。
Consumer<byte[]> sinkConsumer = pulsarClient
.newConsumer()
.topic(transferTopic)
.subscriptionName("sink-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement
.subscribe();