Run RocketMQ with Docker Compose

This section introduces how to quickly deploy a single-node, single-replica RocketMQ service using Docker-compose and complete simple message sending and receiving.

Run RocketMQ with Docker Compose - 图1System Requirements

  1. 64-bit operating system
  2. 64-bit JDK 1.8+

1. Write docker-compose

To quickly start and run the RocketMQ cluster, you can use the following template to create a docker-compose.yml file by modifying or adding configurations in the environment section.

  1. version: '3.8'
  2. services:
  3. namesrv:
  4. image: apache/rocketmq:5.2.0
  5. container_name: rmqnamesrv
  6. ports:
  7. - 9876:9876
  8. networks:
  9. - rocketmq
  10. command: sh mqnamesrv
  11. broker:
  12. image: apache/rocketmq:5.2.0
  13. container_name: rmqbroker
  14. ports:
  15. - 10909:10909
  16. - 10911:10911
  17. - 10912:10912
  18. environment:
  19. - NAMESRV_ADDR=rmqnamesrv:9876
  20. depends_on:
  21. - namesrv
  22. networks:
  23. - rocketmq
  24. command: sh mqbroker
  25. proxy:
  26. image: apache/rocketmq:5.2.0
  27. container_name: rmqproxy
  28. networks:
  29. - rocketmq
  30. depends_on:
  31. - broker
  32. - namesrv
  33. ports:
  34. - 8080:8080
  35. - 8081:8081
  36. restart: on-failure
  37. environment:
  38. - NAMESRV_ADDR=rmqnamesrv:9876
  39. command: sh mqproxy
  40. networks:
  41. rocketmq:
  42. driver: bridge

2.Start RocketMQ cluster

tart all defined services according to the docker-compose.yml file.

  • Linux
  • Windows
  1. docker-compose up -d
  1. docker-compose -p rockermq_project up -d

3.Send and Receive Messages with SDK

  1. After testing with tools, we can try to send and receive messages using the SDK. Here is an example of using the Java SDK for message sending and receiving. More details can be found at rocketmq-clients.

  2. Add the following dependency to the pom.xml file to introduce the Java dependency library, replacing rocketmq-client-java-version with the latest version.

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-client-java</artifactId>
    4. <version>${rocketmq-client-java-version}</version>
    5. </dependency>
  3. Enter the broker container and create a Topic using mqadmin.

    1. $ docker exec -it rmqbroker bash
    2. $ sh mqadmin updatetopic -t TestTopic -c DefaultCluster
  4. In the created Java project, create and run a program to send a normal message. The sample code is as follows:

    1. import org.apache.rocketmq.client.apis.ClientConfiguration;
    2. import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    3. import org.apache.rocketmq.client.apis.ClientException;
    4. import org.apache.rocketmq.client.apis.ClientServiceProvider;
    5. import org.apache.rocketmq.client.apis.message.Message;
    6. import org.apache.rocketmq.client.apis.producer.Producer;
    7. import org.apache.rocketmq.client.apis.producer.SendReceipt;
    8. import org.slf4j.Logger;
    9. import org.slf4j.LoggerFactory;
    10. public class ProducerExample {
    11. private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
    12. public static void main(String[] args) throws ClientException {
    13. // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
    14. String endpoint = "localhost:8081";
    15. // The target Topic name for message sending, which needs to be created in advance.
    16. String topic = "TestTopic";
    17. ClientServiceProvider provider = ClientServiceProvider.loadService();
    18. ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
    19. ClientConfiguration configuration = builder.build();
    20. // When initializing Producer, communication configuration and pre-bound Topic need to be set.
    21. Producer producer = provider.newProducerBuilder()
    22. .setTopics(topic)
    23. .setClientConfiguration(configuration)
    24. .build();
    25. // Sending a normal message.
    26. Message message = provider.newMessageBuilder()
    27. .setTopic(topic)
    28. // Set the message index key, which can be used to accurately find a specific message.
    29. .setKeys("messageKey")
    30. // Set the message Tag, used by the consumer to filter messages by specified Tag.
    31. .setTag("messageTag")
    32. // Message body.
    33. .setBody("messageBody".getBytes())
    34. .build();
    35. try {
    36. // Send the message, paying attention to the sending result and catching exceptions.
    37. SendReceipt sendReceipt = producer.send(message);
    38. logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    39. } catch (ClientException e) {
    40. logger.error("Failed to send message", e);
    41. }
    42. // producer.close();
    43. }
    44. }
  5. In the created Java project, create and run a program to subscribe to normal messages. Apache RocketMQ supports both SimpleConsumer and PushConsumer types of consumers. You can choose either method to subscribe to messages.

  1. import java.io.IOException;
  2. import java.util.Collections;
  3. import org.apache.rocketmq.client.apis.ClientConfiguration;
  4. import org.apache.rocketmq.client.apis.ClientException;
  5. import org.apache.rocketmq.client.apis.ClientServiceProvider;
  6. import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
  7. import org.apache.rocketmq.client.apis.consumer.FilterExpression;
  8. import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
  9. import org.apache.rocketmq.client.apis.consumer.PushConsumer;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. public class PushConsumerExample {
  13. private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
  14. private PushConsumerExample() {
  15. }
  16. public static void main(String[] args) throws ClientException, IOException, InterruptedException {
  17. final ClientServiceProvider provider = ClientServiceProvider.loadService();
  18. // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
  19. String endpoints = "localhost:8081";
  20. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
  21. .setEndpoints(endpoints)
  22. .build();
  23. // Subscription message filtering rule, indicating subscription to all Tag messages.
  24. String tag = "*";
  25. FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
  26. // Specify the consumer group the consumer belongs to, Group needs to be created in advance.
  27. String consumerGroup = "YourConsumerGroup";
  28. // Specify which target Topic to subscribe to, Topic needs to be created in advance.
  29. String topic = "TestTopic";
  30. // Initialize PushConsumer
  31. PushConsumer pushConsumer = provider.newPushConsumerBuilder()
  32. .setClientConfiguration(clientConfiguration)
  33. // Set the consumer group.
  34. .setConsumerGroup(consumerGroup)
  35. // Set pre-bound subscription relationship.
  36. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
  37. // Set the message listener.
  38. .setMessageListener(messageView -> {
  39. // Handle messages and return the consumption result.
  40. logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
  41. return ConsumeResult.SUCCESS;
  42. })
  43. .build();
  44. Thread.sleep(Long.MAX_VALUE);
  45. // If PushConsumer is no longer needed, this instance can be closed.
  46. // pushConsumer.close();
  47. }
  48. }

4.Stop all services

  1. docker-compose down