Run RocketMQ in Docker

This section introduces how to quickly deploy a single-node, single-replica RocketMQ service using Docker and complete simple message sending and receiving.

Run RocketMQ in Docker - 图1System Requirements

  1. 64-bit operating system
  2. 64-bit JDK 1.8+

1.Pull RocketMQ Image

Here, we take the RocketMQ 5.2.0 version image from dockerhub as an example to introduce the deployment process.

  1. docker pull apache/rocketmq:5.2.0

2.Create a Shared Network for Containers

RocketMQ involves multiple services and requires multiple containers. Creating a Docker network facilitates communication between containers.

  1. docker network create rocketmq

3.Start NameServer

  1. # Start NameServer
  2. docker run -d --name rmqnamesrv -p 9876:9876 --network rocketmq apache/rocketmq:5.2.0 sh mqnamesrv
  3. # Verify if NameServer started successfully
  4. docker logs -f rmqnamesrv

Run RocketMQ in Docker - 图2info

Once we see ‘The Name Server boot success..’ from namesrv.log, it means the NameServer has been started successfully.

4.Start Broker and Proxy

After nameserver startup, we proceed to start the Broker and Proxy.

  • Linux
  • Windows
  1. # Configure the broker's IP address
  2. echo "brokerIP1=127.0.0.1" > broker.conf
  3. # Start the Broker and Proxy
  4. docker run -d \
  5. --name rmqbroker \
  6. --network rocketmq \
  7. -p 10912:10912 -p 10911:10911 -p 10909:10909 \
  8. -p 8080:8080 -p 8081:8081 \
  9. -e "NAMESRV_ADDR=rmqnamesrv:9876" \
  10. -v ./broker.conf:/home/rocketmq/rocketmq-5.2.0/conf/broker.conf \
  11. apache/rocketmq:5.2.0 sh mqbroker --enable-proxy \
  12. -c /home/rocketmq/rocketmq-5.2.0/conf/broker.conf
  13. # Verify if Broker started successfully
  14. docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"
  1. # Configure the broker's IP address
  2. echo "brokerIP1=127.0.0.1" > broker.conf
  3. # Start the Broker and Proxy
  4. docker run -d ^
  5. --name rmqbroker ^
  6. --net rocketmq ^
  7. -p 10912:10912 -p 10911:10911 -p 10909:10909 ^
  8. -p 8080:8080 -p 8081:8081 \
  9. -e "NAMESRV_ADDR=rmqnamesrv:9876" ^
  10. -v %cd%\broker.conf:/home/rocketmq/rocketmq-5.2.0/conf/broker.conf ^
  11. apache/rocketmq:5.2.0 sh mqbroker --enable-proxy \
  12. -c /home/rocketmq/rocketmq-5.2.0/conf/broker.conf
  13. # Verify if Broker started successfully
  14. docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"

Run RocketMQ in Docker - 图3info

Once we see ‘The broker[brokerName,ip:port] boot success..’ from proxy.log, it means the Broker has been started successfully.

Run RocketMQ in Docker - 图4note

Thus far, a single-Master RocketMQ cluster has been deployed, and we are able to send and receive simple messages.

5.Send and Receive Messages with SDK

We can also try to use the client sdk to send and receive messages, you can see more details from rocketmq-clients.

  1. Create a java project.

  2. Add sdk dependency to pom.xml, remember to replace the rocketmq-client-java-version with the latest release.

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-client-java</artifactId>
    4. <version>${rocketmq-client-java-version}</version>
    5. </dependency>
  3. Enter the broker container and create a Topic using mqadmin.

    1. $ docker exec -it rmqbroker bash
    2. $ sh mqadmin updatetopic -t TestTopic -c DefaultCluster
  4. In the created Java project, create and run a program to send a normal message. The sample code is as follows:

    1. import org.apache.rocketmq.client.apis.ClientConfiguration;
    2. import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    3. import org.apache.rocketmq.client.apis.ClientException;
    4. import org.apache.rocketmq.client.apis.ClientServiceProvider;
    5. import org.apache.rocketmq.client.apis.message.Message;
    6. import org.apache.rocketmq.client.apis.producer.Producer;
    7. import org.apache.rocketmq.client.apis.producer.SendReceipt;
    8. import org.slf4j.Logger;
    9. import org.slf4j.LoggerFactory;
    10. public class ProducerExample {
    11. private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
    12. public static void main(String[] args) throws ClientException {
    13. // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
    14. String endpoint = "localhost:8081";
    15. // The target topic name for message sending, which needs to be created in advance.
    16. String topic = "TestTopic";
    17. ClientServiceProvider provider = ClientServiceProvider.loadService();
    18. ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
    19. ClientConfiguration configuration = builder.build();
    20. // When initializing Producer, communication configuration and pre-bound Topic need to be set.
    21. Producer producer = provider.newProducerBuilder()
    22. .setTopics(topic)
    23. .setClientConfiguration(configuration)
    24. .build();
    25. // Sending a normal message.
    26. Message message = provider.newMessageBuilder()
    27. .setTopic(topic)
    28. // Set the message index key, which can be used to accurately find a specific message.
    29. .setKeys("messageKey")
    30. // Set the message Tag, used by the consumer to filter messages by specified Tag.
    31. .setTag("messageTag")
    32. // Message body
    33. .setBody("messageBody".getBytes())
    34. .build();
    35. try {
    36. // Send the message, paying attention to the sending result and catching exceptions.
    37. SendReceipt sendReceipt = producer.send(message);
    38. logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    39. } catch (ClientException e) {
    40. logger.error("Failed to send message", e);
    41. }
    42. // producer.close();
    43. }
    44. }
  5. In the created Java project, create and run a program to subscribe to normal messages. Apache RocketMQ supports both SimpleConsumer and PushConsumer types of consumers. You can choose either method to subscribe to messages.

  1. import java.io.IOException;
  2. import java.util.Collections;
  3. import org.apache.rocketmq.client.apis.ClientConfiguration;
  4. import org.apache.rocketmq.client.apis.ClientException;
  5. import org.apache.rocketmq.client.apis.ClientServiceProvider;
  6. import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
  7. import org.apache.rocketmq.client.apis.consumer.FilterExpression;
  8. import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
  9. import org.apache.rocketmq.client.apis.consumer.PushConsumer;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. public class PushConsumerExample {
  13. private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
  14. private PushConsumerExample() {
  15. }
  16. public static void main(String[] args) throws ClientException, IOException, InterruptedException {
  17. final ClientServiceProvider provider = ClientServiceProvider.loadService();
  18. // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
  19. String endpoints = "localhost:8081";
  20. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
  21. .setEndpoints(endpoints)
  22. .build();
  23. // Subscription message filtering rule, indicating subscription to all Tag messages.
  24. String tag = "*";
  25. FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
  26. // Specify the consumer group the consumer belongs to, Group needs to be created in advance.
  27. String consumerGroup = "YourConsumerGroup";
  28. // Specify which target Topic to subscribe to, Topic needs to be created in advance.
  29. String topic = "TestTopic";
  30. // Initialize PushConsumer
  31. PushConsumer pushConsumer = provider.newPushConsumerBuilder()
  32. .setClientConfiguration(clientConfiguration)
  33. // Set the consumer group
  34. .setConsumerGroup(consumerGroup)
  35. // Set pre-bound subscription relationship
  36. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
  37. // Set the message listener
  38. .setMessageListener(messageView -> {
  39. // Handle messages and return the consumption result
  40. logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
  41. return ConsumeResult.SUCCESS;
  42. })
  43. .build();
  44. Thread.sleep(Long.MAX_VALUE);
  45. // If PushConsumer is no longer needed, this instance can be closed.
  46. // pushConsumer.close();
  47. }
  48. }

6. Stop the Containers

After completing the experiment, we can stop the containers as follows.

  1. # Stop the NameServer container
  2. docker stop rmqnamesrv
  3. # Stop the Broker container
  4. docker stop rmqbroker