Simple Message Sending

1.Creating Topic in Cluster

RocketMQ cluster is enabled by default with autoCreateTopicEnable configuration, which will automatically create Topics for the sent messages. If autoCreateTopicEnable is not enabled, you can also use the RocketMQ Admin tool to create the target Topic.

  1. $ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -n 127.0.0.1:9876
  2. create topic to 127.0.0.1:10911 success.
  3. TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes=null]

After executing the command above, 8 queues are created on the Broker machine with the Topic named TopicTest.

2.Adding Client-Side Dependency

Firstly, add RocketMQ client-side dependency to JAVA application.

  • Maven
  • Gradle
  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.9.4</version>
  5. </dependency>
  1. compile 'org.apache.rocketmq:rocketmq-client:4.9.4'

3.Message Sending

Apache RocketMQ sends messages in three ways: synchronous, asynchronous, and one-way. The first two message types are reliable since the response will be returned from the server regardless of whether their messages are successfully sent or not.

3.1 Synchronous Sending

Synchronous Sending is a communication method in which the message sender sends a message and will send the next message only after receiving a synchronous response from the server. Reliable synchronous transmission is widely used in various scenarios, such as important notification messages, short message notifications, etc.

同步发送

The entire code for synchronous sending is as follows:

  1. Create a Producer. Create a DefaultMQProducer in advance. The Producer should contain the name of the Producer group, which is a collection of Producer, they would send the same type of messages with identical logic.
  2. Set the address of NameServer. Apache RocketMQ is able to set the address of the NameServer (described in the client configuration) in many ways. The following example is set by calling the producer’s setNamesrvAddr() method in the code, separated by a semicolon if there is more than one NameServer, such as “127.0.0.2:9876;127.0.0.3:9876”.
  3. Build the message. Set the topic, tag, body, and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the Consumer side.
  4. Call the send() method to send the message. Ultimately, the send() method will return a SendResult. The SendResult contains the actual send status including SEND_OK (send success), FLUSH_DISK_TIMEOUT (disk flush timeout), FLUSH_SLAVE_TIMEOUT (sync to slave timeout), SLAVE_NOT_AVAILABLE (slave can not be used), and an exception is thrown if it fails.
  1. public class SyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. // Initialize a producer and set the Producer group name
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
  5. // Set the address of NameServer
  6. producer.setNamesrvAddr("localhost:9876"); //(2)
  7. // Start Producer
  8. producer.start();
  9. for (int i = 0; i < 100; i++) {
  10. // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
  11. Message msg = new Message("TopicTest" /* Topic */,
  12. "TagA" /* Tag */,
  13. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  14. ); //(3)
  15. // Use the producer to send and wait for the result of sending synchronously
  16. SendResult sendResult = producer.send(msg); //(4)
  17. System.out.printf("%s%n", sendResult);
  18. }
  19. // Close the producer once it is no longer in use
  20. producer.shutdown();
  21. }
  22. }

3.2 Asynchronous Sending

异步发送

Asynchronous sending is a sending method in which the sender sends messages continuously without waiting for the server to return a response. Asynchronous sending requires the implementation of the Asynchronous Send Callback Interface (SendCallback).

Simple Message Sending - 图3note

Asynchronous sending requires the implementation of the Asynchronous SendCallback Interface.

After sending a message, the sender does not need to wait for a response from the server to send the next message. The sender receives the response from the server through the callback interface and handles the result. Asynchronous sending is generally used in time-consuming and response time sensitive business scenarios. For example, the video upload notifies the start of transcoding service, and notifies the push of transcoding result after transcoding is completed.

The following is the sample code.

  1. public class AsyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. // Initialize a producer and set the Producer group name
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // Set the address of NameServer
  6. producer.setNamesrvAddr("localhost:9876");
  7. // Start Producer
  8. producer.start();
  9. producer.setRetryTimesWhenSendAsyncFailed(0);
  10. int messageCount = 100;
  11. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
  12. for (int i = 0; i < messageCount; i++) {
  13. try {
  14. final int index = i;
  15. // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
  16. Message msg = new Message("TopicTest",
  17. "TagA",
  18. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  19. // Send a message asynchronously, the result is returned to the client by callback
  20. producer.send(msg, new SendCallback() {
  21. @Override
  22. public void onSuccess(SendResult sendResult) {
  23. System.out.printf("%-10d OK %s %n", index,
  24. sendResult.getMsgId());
  25. countDownLatch.countDown();
  26. }
  27. @Override
  28. public void onException(Throwable e) {
  29. System.out.printf("%-10d Exception %s %n", index, e);
  30. e.printStackTrace();
  31. countDownLatch.countDown();
  32. }
  33. });
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. countDownLatch.countDown();
  37. }
  38. }
  39. //If reliable transmission is required for asynchronous sending, the logic must not be terminated until a clear result is returned from the callback interface. Otherwise, closing the Producer immediately may result in some messages not being successfully transmitted.
  40. countDownLatch.await(5, TimeUnit.SECONDS);
  41. // Close the producer once it is no longer in use
  42. producer.shutdown();
  43. }
  44. }

Simple Message Sending - 图4note

The only difference between asynchronous and synchronous sending methods is the parameters for calling the sending interface. Asynchronous sending does not wait for the return of send() method, instead, it will carry the SendCallback implementation. The SendCallback interface has two methods (onSuccess and onException), indicating that the message is sent successfully or failed.

3.3 One-Way Sending

单项模式发送

The sender is only responsible for sending the message and does not wait for the server to return a response and no callback function is triggered, in other words, it only sends the request and does not wait for the answer. The process of sending messages in this way is very short, usually in the microsecond level. It is suitable for some scenarios where the time consumption is very short, but the reliability requirement is not high, such as log collection.

  1. public class OnewayProducer {
  2. public static void main(String[] args) throws Exception{
  3. // Initialize a producer and set the Producer group name
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // Set the address of NameServer
  6. producer.setNamesrvAddr("localhost:9876");
  7. // Start Producer
  8. producer.start();
  9. for (int i = 0; i < 100; i++) {
  10. // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
  11. Message msg = new Message("TopicTest" /* Topic */,
  12. "TagA" /* Tag */,
  13. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  14. );
  15. // Since there is no request-answer processing when sending messages in the oneway method, if there is a message sending failure, data will be lost because there is no retry. If data cannot be lost, it is recommended to use the reliable synchronous or reliable asynchronous sending method.
  16. producer.sendOneway(msg);
  17. }
  18. // Close the producer once it is no longer in use
  19. producer.shutdown();
  20. }
  21. }

One-way mode will call the sendOneway() method, which does not wait or process the returned result.