Client Configuration

In the RocketMQ Broker cluster, both producers and consumers are clients. This section mainly describes the common behavior configurations for producers and consumers.

Client addressing method

RocketMQ allows clients to find the Name Server, and then find the Broker through the Name Server. There are multiple configuration methods, with priority from high to low, and higher priority will override lower priority.

  • Specifying the Name Server address in the code, with multiple namesrv addresses separated by semicolons

    1. producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
    2. consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Specifying the Name Server address in the Java startup parameters

    1. -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
  • Specifying the Name Server address in the environment variable

    1. export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
  • HTTP static server addressing (default)

    After the client starts, it will periodically access a static HTTP server with the following address: :http://jmenv.tbsite.net:8080/rocketmq/nsaddr,and the return content of this URL is as follows:

    1. 192.168.0.1:9876;192.168.0.2:9876

The client defaults to accessing this HTTP server every 2 minutes and updating the local Name Server address. The URL is hard-coded in the code, and can be changed by modifying the /etc/hosts file. For example, adding the following configuration in /etc/hosts:

  1. 10.232.22.67 jmenv.taobao.net

It is recommended to use the HTTP static server addressing method, as it is simple to deploy the client and the Name Server cluster can be hot upgraded.

Client configuration

DefaultMQProducer, TransactionMQProducer, DefaultMQPushConsumer, and DefaultMQPullConsumer all extends from the ClientConfig class, which is a common configuration class for clients. The client’s configuration is in the form of get and set methods, and each parameter can be configured with Spring or in the code. For example, the namesrvAddr parameter can be configured like this: producer.setNamesrvAddr(“192.168.0.1:9876”), and other parameters are similar.

ClientConfig configuration

NameDescriptionParameter typeDefault valueEffective valueImportance
namesrvAddrNameServer addressStringFrom -D system parameter rocketmq.namesrv.addr or environment variable.NAMESRV_ADDR
instanceNameClient instance nameStringFrom -D system parameter rocketmq.client.name, otherwise it is DEFAULT
clientIPClient IPStringRemotingUtil.getLocalAddress()
namespaceClient namespaceString
accessChannelSetting up access channelsAccessChannelLOCAL
clientCallbackExecutorThreadsThe number of processor cores when the client communication layer receives a network requestintRuntime.getRuntime().availableProcessors()
pollNameServerIntervalTime interval for polling route information from NameServerint30000, in milliseconds
heartbeatBrokerIntervalInterval for regularly sending registration heartbeats to brokerint30000, in milliseconds
persistConsumerOffsetIntervalApplies to Consumer, the interval for persisting consumption progressint5000, in milliseconds
pullTimeDelayMillsWhenExceptionDelay time setting when pulling messages encounters an exceptionlong1000, in milliseconds
unitNameUnit nameString
unitModeUnit modebooleanfalse
vipChannelEnabledWhether to enable vip netty channel for sending messagesbooleanFrom -D com.rocketmq.sendMessageWithVIPChannel parameter value, if not it is true
useTLSWhether to use TLS transport.booleanFrom -D system parameter tls.enable, otherwise it is false.
mqClientApiTimeoutMq client api timeout settingint3000, in milliseconds
languageClient implementation languageLanguageCodeLanguageCode.JAVA

DefaultMQProducer configuration

NameDescriptionParameter typeDefault ValueEffective valueImportance
producerGroupThe name of the production group, the identifier of a class of ProducersStringDEFAULT_PRODUCER
createTopicKeyWhen sending a message, if the topic is not found, if you want to automatically create the topic, you need a key topic, and this value is the value of the key topic.StringTopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC
defaultTopicQueueNumsThe default number of queues when creating a topic automaticallyint4
sendMsgTimeoutThe default send timeout timeint3000, in milliseconds
compressMsgBodyOverHowmucThe threshold for message body compressionint1024 * 4,4K
retryTimesWhenSendFailedThe number of internal retries for rocketmq if synchronous sending failsint2
retryTimesWhenSendAsyncFailedThe number of internal retries for rocketmq if asynchronous sending failsint2
retryAnotherBrokerWhenNotStoreOKIf the sending result is not SEND_OK status, whether it should be treated as a failure and retriedbooleanfalse
maxMessageSizeClient verification, the maximum message body size allowed to be sentint1024 1024 4,4M
traceDispatchersynchronous data transfer interfaceTraceDispatchernull

DefaultMQPushConsumer configuration

NameDescriptionParameter typeDefault valueEffective valueImportance
consumerGroupThe name of the consumer group, used to identify a class of consumersString
messageModelConsumption modeMessageModelMessageModel.CLUSTERINGallocateMessageQueueStrategy
consumeFromWhereStarting consumption point strategyConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestampWhen using CONSUME_FROM_LAST_OFFSET, start consuming from which time pointStringHalf an hour ago
allocateMessageQueueStrategyLoad balancing strategy algorithmAllocateMessageQueueStrategyAllocateMessageQueueAveragely(Modulo average distribution.)
subscriptionSubscription relationshipMap<String, String>{}
messageListenerMessage processing listener (callback)MessageListenernull
offsetStoreMessage consumption progress storageOffsetStorenull
consumeThreadMinCore size of the consumer thread poolint20
consumeThreadMaxMaximum size of the consumer thread poolint64
adjustThreadPoolNumsThresholdDynamic thread core number consumer accumulation thresholdlong100000
consumeConcurrentlyMaxSpanIn concurrent consumption, the maximum offset span allowed for a single consume queue, which will trigger flow controlint2000pullInterval
pullThresholdForQueueConsume queue flow control thresholdint100
pullIntervalPulling intervallong0, in milliseconds
pullThresholdForTopicTopic-level flow control thresholdint-1
pullThresholdSizeForTopicLimit the topic-level cache message sizeint-1
pullBatchSizeMaximum batch size for one pullint32
consumeMessageBatchMaxSizeMaximum number of messages for batch consumptionint1
postSubscriptionWhenPullWhether to update the subscription relationship each time a pull is madebooleanfalse
unitModeSubscription group unitbooleanfalse
maxReconsumeTimesThe maximum number of times a message will be consumed before being delivered to the dead-letter queue if it failsint-1
suspendCurrentQueueTimeMillisThe time interval for consuming again if the serial consumption returns ROLLBACK or SUSPEND_CURRENT_QUEUE_A_MOMENTlong1000
consumeTimeoutThe longest timeout time for consumptionlong15, in minutes
awaitTerminationMillisWhenShutdownThe longest wait time for messages when closing the consumer, 0 means no wait.long0
traceDispatcherAsynchronous data transfer interfaceTraceDispatchernull

DefaultLitePullConsumer configuration

NameDescriptionParameter typeDefault valueEffective valueImportance
consumerGroupThe name of the consumer group, used to identify a type of consumerString
brokerSuspendMaxTimeMillisThe maximum time that a connection will be suspended for in long polling by the brokerlong20000, in milliseconds
consumerTimeoutMillisWhenSuspendThe maximum wait time for a response from the broker in long polling by the clientlong30000, in milliseconds
consumerPullTimeoutMillisThe socket timeout for pulling messageslong10000, in milliseconds
messageModelThe consumption modeMessageModelMessageModel.CLUSTERING
messageQueueListenerA listener for changes in the allocation of consume queues in load balancingMessageQueueListener
offsetStoreThe message consumption progress storageOffsetStore
allocateMessageQueueStrategyThe load balancing strategy algorithmAllocateMessageQueueStrategyAllocateMessageQueueAveragely(Modulo average distribution.)
unitModeThe unit of subscription group settingsbooleanfalse
autoCommitThe setting for automatic commit of offsetbooleantrue
pullThreadNumsThe number of pull threads setint20
MIN_AUTOCOMMIT_INTERVAL_MILLISThe minimum interval time for committing offsetlong1000, in milliseconds
autoCommitIntervalMillisThe maximum interval time for committing offsetlong5000, in milliseconds
pullBatchSizeThe maximum number of messages pulled each timelong10
pullThresholdForAllThe threshold for flow control of consumed requestsint10000
consumeMaxSpanThe maximum offset span for consumptionint2000
pullThresholdForQueueThe queue level flow control thresholdint1000
pullThresholdSizeForQueueThe queue level limit on cached message sizeint100MiB
pollTimeoutMillisThe polling timeout settinglong5000, in milliseconds
topicMetadataCheckIntervalMillisThe interval for checking changes in topic metadatalong30000, in milliseconds
consumeFromWhereThe consumption mode settingConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestampThe time for backtracking consumptionStringThe default consumption rollback time is half an hour ago.
traceDispatcherThe interface for asynchronous data transmissionTraceDispatchernull
enableMsgTraceThe flag for message tracingbooleanfalse
customizedTraceTopicThe name of the topic for message tracingString