Transactional Message Sending

Introduction

In some scenarios where there is a strong need for data consistency, Apache RocketMQ transactional messages can be used to ensure consistency of upstream and downstream data.

事务消息1

Transactional messages are send in two phases. At first, a half message will be delivered, which refers to a message is successfully sent to the MQ server, but the server did not receive the second acknowledgement of the message from the Producer, then the message will be marked as “temporarily undeliverable” state.

The local transaction will be executed if the message is sent successfully, and a half message status (commit or rollback) will be delivered to the Broker depending on the local transaction results.

If the second acknowledgement of a transactional message is lost due to network flashback, Producer restart, etc., the Broker will find the message which is in “half message” state for a long time, and take the initiative to check the transaction status of the message (Commit or Rollback) from the Producer. Therefore, the downstream will receive the message if the local transaction is executed successfully, otherwise it will not. This ultimately ensures the consistency of the upstream and downstream data.

The detailed execute flow of the transactional message is shown in the following diagram:

事务消息2

Transactional Message Sending Procedure

  1. The Producer sends the half message to the RocketMQ Broker.

  2. After the RocketMQ Broker persists the message successfully, it returns an Ack to the Producer confirming that the message was sent successfully and it is a half message.

  3. The Producer starts executing the local transaction.

  4. The Producer submits a second acknowledgement (Commit or Rollback) to the server based on the result of the local transaction, and the server receives the acknowledgment and processes the logic as follows.

    • If the second acknowledgement result is Commit: the server marks the half message as deliverable and delivers it to the Consumer.
    • If the second acknowledgement result is Rollback: the server will rollback the transaction and will not deliver the half message to the Consumer.
  5. In the special case of network disconnection or the Producer restarts, if the server does not receive the second acknowledgment result from the Producer, or the second acknowledgment result received by the server is Unknown, the server will initiate a rollback message to a Producer after a fixed time.

The procedure of the transaction status check are as follows.

  1. After receiving the transaction status check request, the Producer needs to verify the final result of the local transaction of the corresponding message.
  2. The producer submits the second acknowledgment again based on the final result of the local transaction, and the server side will still processes the half message according to step 4.

Example

  1. public class TransactionProducer {
  2. public static void main(String[] args) throws MQClientException, InterruptedException {
  3. TransactionListener transactionListener = new TransactionListenerImpl();
  4. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
  5. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  6. @Override
  7. public Thread newThread(Runnable r) {
  8. Thread thread = new Thread(r);
  9. thread.setName("client-transaction-msg-check-thread");
  10. return thread;
  11. }
  12. });
  13. producer.setExecutorService(executorService);
  14. producer.setTransactionListener(transactionListener);
  15. producer.start();
  16. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  17. for (int i = 0; i < 10; i++) {
  18. try {
  19. Message msg =
  20. new Message("TopicTest", tags[i % tags.length], "KEY" + i,
  21. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  22. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  23. System.out.printf("%s%n", sendResult);
  24. Thread.sleep(10);
  25. } catch (MQClientException | UnsupportedEncodingException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. for (int i = 0; i < 100000; i++) {
  30. Thread.sleep(1000);
  31. }
  32. producer.shutdown();
  33. }
  34. static class TransactionListenerImpl implements TransactionListener {
  35. private AtomicInteger transactionIndex = new AtomicInteger(0);
  36. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  37. @Override
  38. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  39. int value = transactionIndex.getAndIncrement();
  40. int status = value % 3;
  41. localTrans.put(msg.getTransactionId(), status);
  42. return LocalTransactionState.UNKNOW;
  43. }
  44. @Override
  45. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  46. Integer status = localTrans.get(msg.getTransactionId());
  47. if (null != status) {
  48. switch (status) {
  49. case 0:
  50. return LocalTransactionState.UNKNOW;
  51. case 1:
  52. return LocalTransactionState.COMMIT_MESSAGE;
  53. case 2:
  54. return LocalTransactionState.ROLLBACK_MESSAGE;
  55. default:
  56. return LocalTransactionState.COMMIT_MESSAGE;
  57. }
  58. }
  59. return LocalTransactionState.COMMIT_MESSAGE;
  60. }
  61. }
  62. }

Transactional messages are no longer sent by DefaultMQProducer, but using TransactionMQProducer. The above sample sets the thread pool for the transactional message check, if not, one will be generated by default. The most important thing is to implement the TransactionListener interface and pass TransactionMQProducer into it.

Transactional Message Sending - 图3note

The TransactionListener interface is defined as follows:

  1. public interface TransactionListener {
  2. /**
  3. * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  4. *
  5. * @param msg Half(prepare) message
  6. * @param arg Custom business parameter
  7. * @return Transaction state
  8. */
  9. LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
  10. /**
  11. * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
  12. * method will be invoked to get local transaction status.
  13. *
  14. * @param msg Check message
  15. * @return Transaction state
  16. */
  17. LocalTransactionState checkLocalTransaction(final MessageExt msg);
  18. }

executeLocalTransaction is the method that executes the local transaction after the half message has been sent successfully. After executing the local transaction, the following three states can be returned in this method.

  • LocalTransactionState.COMMIT_MESSAGE: the transaction is committed, allowing the consumer to consume the message.
  • LocalTransactionState.ROLLBACK_MESSAGE: the transaction is rolled back, and the message will be discarded without being allowed to be consumed.
  • LocalTransactionState.UNKNOW: temporarily unable to determine the state. After waiting for a fixed time, the Broker send the transaction status check message back to the producer.

checkLocalTransaction is a method to check the transaction state on the Broker side because the second acknowledgement is not received. Transaction status check rule: After the execution of the local transaction is completed, if the local transaction returns LocalTransactionState.UNKNOW status to the Broker, or the Producer exits causing no status returned from the Producer. Then the Broker will initiate a transaction status check message to the Producer, and it will check again at regular intervals if the transaction status is still not obtained.

Transactional Message Sending - 图4caution

It is important to note that the ProducerGroupName of a transactional message cannot be set arbitrarily. Transactional messages have a transaction status check mechanism. If the original Producer is found to have crashed and collapsed, the Broker will contact other Producer instances within the same Producer group to check the local transaction execution and Commit or Rollback half messages.