Get started

Pulsar transaction is primarily a server-side and protocol-level feature. This tutorial guides you through every step of how to use the Pulsar transaction API to send and receive messages in a Java client.

Get started - 图1note

Currently, Pulsar transaction API is available in Pulsar 2.8.0 or later versions. It is only available for Java, Go and .NET clients.

Prerequisites

Steps

To use Pulsar transaction API, complete the following steps.

  1. Enable transactions.

    You can set the following configurations in the broker.conf or standalone.conf file.

    1. //mandatory configuration, used to enable transaction coordinator
    2. transactionCoordinatorEnabled=true
    3. //mandatory configuration, used to create systemTopic used for transaction buffer snapshot
    4. systemTopicEnabled=true

    Get started - 图2note

    By default, Pulsar transactions are disabled.

  2. Initialize transaction coordinator metadata.

    The transaction coordinator can leverage the advantages of partitioned topics (such as load balance).

    Input

    1. bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone

    Output

    1. Transaction coordinator metadata setup success
  3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace pulsar/system produce/consume permissions.

  4. Create producers and consumers.

  5. Produce and receive messages.

  6. Create transactions.

  7. Produce and ack messages with transactions.

    Get started - 图3note

    Currently, messages can be acked individually rather than cumulatively.

  8. End transactions.

    Get started - 图4tip

    The code snippet below is the example for step 3 - step 8.

  • Java
  • Go
  1. PulsarClient client = PulsarClient.builder()
  2. // Step 3: create a Pulsar client and enable transactions.
  3. .enableTransaction(true)
  4. .serviceUrl(jct.serviceUrl)
  5. .build();
  6. // Step 4: create three producers to produce messages to input and output topics.
  7. ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
  8. Producer<String> inputProducer = producerBuilder.topic(inputTopic)
  9. .sendTimeout(0, TimeUnit.SECONDS).create();
  10. Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
  11. .sendTimeout(0, TimeUnit.SECONDS).create();
  12. Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
  13. .sendTimeout(0, TimeUnit.SECONDS).create();
  14. // Step 4: create three consumers to consume messages from input and output topics.
  15. Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
  16. .subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
  17. Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
  18. .subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
  19. Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
  20. .subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
  21. int count = 2;
  22. // Step 5: produce messages to input topics.
  23. for (int i = 0; i < count; i++) {
  24. inputProducer.send("Hello Pulsar! count : " + i);
  25. }
  26. // Step 5: consume messages and produce them to output topics with transactions.
  27. for (int i = 0; i < count; i++) {
  28. // Step 5: the consumer successfully receives messages.
  29. Message<String> message = inputConsumer.receive();
  30. // Step 6: create transactions.
  31. // The transaction timeout is specified as 10 seconds.
  32. // If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
  33. Transaction txn = null;
  34. try {
  35. txn = client.newTransaction()
  36. .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
  37. // Step 6: you can process the received message with your use case and business logic.
  38. // Step 7: the producers produce messages to output topics with transactions
  39. outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send();
  40. outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send();
  41. // Step 7: the consumers acknowledge the input message with the transactions *individually*.
  42. inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
  43. // Step 8: commit transactions.
  44. txn.commit().get();
  45. } catch (ExecutionException e) {
  46. if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) {
  47. // If TransactionConflictException is not thrown,
  48. // you need to redeliver or negativeAcknowledge this message,
  49. // or else this message will not be received again.
  50. inputConsumer.negativeAcknowledge(message);
  51. }
  52. // If a new transaction is created,
  53. // then the old transaction should be aborted.
  54. if (txn != null) {
  55. txn.abort();
  56. }
  57. }
  58. }
  59. // Final result: consume messages from output topics and print them.
  60. for (int i = 0; i < count; i++) {
  61. Message<String> message = outputConsumerOne.receive();
  62. System.out.println("Receive transaction message: " + message.getValue());
  63. }
  64. for (int i = 0; i < count; i++) {
  65. Message<String> message = outputConsumerTwo.receive();
  66. System.out.println("Receive transaction message: " + message.getValue());
  67. }
  1. // Step 3: create a Pulsar client and enable transactions.
  2. client, err := pulsar.NewClient(pulsar.ClientOptions{
  3. URL: "<serviceUrl>",
  4. EnableTransaction: true,
  5. })
  6. if err != nil {
  7. log.Fatalf("create client fail, err = %v", err)
  8. }
  9. defer client.Close()
  10. // Step 4: create three producers to produce messages to input and output topics.
  11. inputTopic := "inputTopic"
  12. outputTopicOne := "outputTopicOne"
  13. outputTopicTwo := "outputTopicTwo"
  14. subscriptionName := "your-subscription-name"
  15. inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
  16. Topic: inputTopic,
  17. SendTimeout: 0,
  18. })
  19. defer inputProducer.Close()
  20. outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
  21. Topic: outputTopicOne,
  22. SendTimeout: 0,
  23. })
  24. defer outputProducerOne.Close()
  25. outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
  26. Topic: outputTopicTwo,
  27. SendTimeout: 0,
  28. })
  29. defer outputProducerTwo.Close()
  30. // Step 4: create three consumers to consume messages from input and output topics.
  31. inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
  32. Topic: inputTopic,
  33. SubscriptionName: subscriptionName,
  34. })
  35. defer inputConsumer.Close()
  36. outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
  37. Topic: outputTopicOne,
  38. SubscriptionName: subscriptionName,
  39. })
  40. defer outputConsumerOne.Close()
  41. outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
  42. Topic: outputTopicTwo,
  43. SubscriptionName: subscriptionName,
  44. })
  45. defer outputConsumerTwo.Close()
  46. // Step 5: produce messages to input topics.
  47. ctx := context.Background()
  48. count := 2
  49. for i := 0; i < count; i++ {
  50. inputProducer.Send(ctx, &pulsar.ProducerMessage{
  51. Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
  52. })
  53. }
  54. // Step 5: consume messages and produce them to output topics with transactions.
  55. for i := 0; i < count; i++ {
  56. // Step 5: the consumer successfully receives messages.
  57. message, err := inputConsumer.Receive(ctx)
  58. if err != nil {
  59. log.Printf("receive message from %s fail, err = %v", inputTopic, err)
  60. continue
  61. }
  62. // Step 6: create transactions.
  63. // The transaction timeout is specified as 10 seconds.
  64. // If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
  65. txn, err := client.NewTransaction(10 * time.Second)
  66. if err != nil {
  67. log.Printf("create txn fail, err = %v", err)
  68. continue
  69. }
  70. // Step 6: you can process the received message with your use case and business logic.
  71. // processMessage(message)
  72. // Step 7: the producers produce messages to output topics with transactions
  73. _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{
  74. Transaction: txn,
  75. Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)),
  76. })
  77. if err != nil {
  78. log.Printf("send to producerOne fail %v", err)
  79. txn.Abort(ctx)
  80. }
  81. _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{
  82. Transaction: txn,
  83. Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)),
  84. })
  85. if err != nil {
  86. log.Printf("send to producerTwo fail %v", err)
  87. txn.Abort(ctx)
  88. }
  89. // Step 7: the consumers acknowledge the input message with the transactions *individually*.
  90. err = inputConsumer.AckWithTxn(message, txn)
  91. if err != nil {
  92. log.Printf("ack message fail %v", err)
  93. txn.Abort(ctx)
  94. }
  95. // Step 8: commit transactions.
  96. err = txn.Commit(ctx)
  97. if err != nil {
  98. log.Printf("commit txn fail %v", err)
  99. }
  100. }
  101. // Final result: consume messages from output topics and print them.
  102. for i := 0; i < count; i++ {
  103. message, _ := outputConsumerOne.Receive(ctx)
  104. log.Printf("Receive transaction message: %s", string(message.Payload()))
  105. }
  106. for i := 0; i < count; i++ {
  107. message, _ := outputConsumerTwo.Receive(ctx)
  108. log.Printf("Receive transaction message: %s", string(message.Payload()))
  109. }

Output

  1. Receive transaction message: Hello Pulsar! count : 1
  2. Receive transaction message: Hello Pulsar! count : 2
  3. Receive transaction message: Hello Pulsar! count : 1
  4. Receive transaction message: Hello Pulsar! count : 2