How to use transactions?

Transaction API

The transaction feature is primarily a server-side and protocol-level feature. You can use the transaction feature via the transaction API, which is available in Pulsar 2.8.0 or later.

To use the transaction API, you do not need any additional settings in the Pulsar client. By default, transactions is disabled.

Currently, transaction API is only available for Java clients. Support for other language clients will be added in the future releases.

Quick start

This section provides an example of how to use the transaction API to send and receive messages in a Java client.

  1. Start Pulsar 2.8.0 or later.

  2. Enable transaction.

    Change the configuration in the broker.conf file.

    1. transactionCoordinatorEnabled=true
  1. If you want to enable batch messages in transactions, follow the steps below.
  2. Set `acknowledgmentAtBatchIndexLevelEnabled` to `true` in the `broker.conf` or `standalone.conf` file.
  3. ```
  4. acknowledgmentAtBatchIndexLevelEnabled=true
  5. ```
  1. Initialize transaction coordinator metadata.

    The transaction coordinator can leverage the advantages of partitioned topics (such as load balance).

    Input

    1. bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone
  1. **Output**
  2. ```
  3. Transaction coordinator metadata setup success
  4. ```
  1. Initialize a Pulsar client.

    1. PulsarClient client = PulsarClient.builder()
    2. .serviceUrl(“pulsar://localhost:6650”)
    3. .enableTransaction(true)
    4. .build();

Now you can start using the transaction API to send and receive messages. Below is an example of a consume-process-produce application written in Java.

How to use transactions? - 图1

Let’s walk through this example step by step.

StepDescription
1. Start a transaction.The application opens a new transaction by calling PulsarClient.newTransaction. It specifics the transaction timeout as 1 minute. If the transaction is not committed within 1 minute, the transaction is automatically aborted.
2. Receive messages from topics.The application creates two normal consumers to receive messages from topic input-topic-1 and input-topic-2 respectively.
3. Publish messages to topics with the transaction.The application creates two producers to produce the resulting messages to the output topic output-topic-1 and output-topic-2 respectively. The application applies the processing logic and generates two output messages. The application sends those two output messages as part of the transaction opened in the first step via Producer.newMessage(Transaction).
4. Acknowledge the messages with the transaction.In the same transaction, the application acknowledges the two input messages.
5. Commit the transaction.The application commits the transaction by calling Transaction.commit() on the open transaction. The commit operation ensures the two input messages are marked as acknowledged and the two output messages are written successfully to the output topics.

[1] Example of enabling batch messages ack in transactions in the consumer builder.

  1. Consumer<byte[]> sinkConsumer = pulsarClient
  2. .newConsumer()
  3. .topic(transferTopic)
  4. .subscriptionName("sink-topic")
  5. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  6. .subscriptionType(SubscriptionType.Shared)
  7. .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement
  8. .subscribe();