How to use transactions?

事务API

事务功能主要是服务器端和协议级功能。 您可以通过 事务API 使用事务功能,该 API 在 ** Pulsar 2.8.0 或更高版本**中可用。

要使用事务 API,您不需要在 Pulsar 客户端中进行任何额外设置。 默认情况下,事务是被禁用的

目前,事务API仅适用于 Java客户端。 未来版本中将添加对其他语言客户端的支持。

快速开始

本节提供了如何在Java客户端中使用事务API发送和接收消息的示例

  1. 启动Pulsar 2.8.0或更高版本。

  2. 开启事务。

    更改 broker.conf 文件中的配置。

    1. transactionCoordinatorEnabled=true

    如果要在事务中启用批处理消息,请按照以下步骤操作。

    broker.confstandalone.conf 文件中将 acknowledgmentAtBatchIndexLevelEnabled 设置为 true

    1. acknowledgmentAtBatchIndexLevelEnabled=true
  3. 初始化事务协调器元数据。

    事务协调器可以利用分区主题的优势(例如负载平衡)。

    输入

    1. bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone

    输出

    1. Transaction coordinator metadata setup success
  4. 初始化一个Pulsar客户端。

    1. PulsarClient client = PulsarClient.builder()
    2. .serviceUrl(“pulsar://localhost:6650”)
    3. .enableTransaction(true)
    4. .build();

现在您可以开始使用事务API来发送和接收消息。 下面是一个用 Java 编写的 consume-process-produce 应用程序示例。

How to use transactions? - 图1

让我们逐步来看看这个例子。

步骤说明
1. 启动一个事务应用程序通过调用PulsarClient.newTransaction打开一个新事务。 它将事务超时指定为1分钟。 如果事务没有在1分钟内提交,事务将自动中止。
2. 接收来自主题的消息应用程序创建两个普通消费者,分别接收来自主题 input-topic-1 和 input-topic-2 的消息。

如果要在事务中启用批量消息确认,请在消费者构建器(Consumer Builder)中调用enableBatchIndexAcknowledgment(true) 方法。 有关示例,请参阅此表下方的 [1]。
3. 将消息发布到带有事务的主题。应用程序创建两个生产者,分别向输出主题output-topic-1和output-topic-2生成结果消息。 应用程序应用处理逻辑并生成两个输出消息。 应用程序通过 Producer.newMessage(Transaction) 将这两个输出消息作为第一步中打开的事务的一部分发送。
4. 确认带有事务的消息。在同一事务中,应用程序确认两个输入消息。
5. 提交该事务。应用程序通过对打开的事务调用Transaction.commit()来提交事务。 提交操作确保两个输入消息被标记为已确认,并且两个输出消息成功写入输出主题。

提示:您也可以调用 Transaction.abort() 来中止打开的事务。

[1] 在消费者构建器中的事务中启用批处理消息确认的示例。

  1. Consumer<byte[]> sinkConsumer = pulsarClient
  2. .newConsumer()
  3. .topic(transferTopic)
  4. .subscriptionName("sink-topic")
  5. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  6. .subscriptionType(SubscriptionType.Shared)
  7. .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement
  8. .subscribe();