客户端配置

相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。

客户端寻址方式

RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。

  • 代码中指定Name Server地址,多个namesrv地址之间用分号分割

    1. producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
    2. consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Java启动参数中指定Name Server地址

    1. -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
  • 环境变量指定Name Server地址

    1. export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
  • HTTP静态服务器寻址(默认)

    客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:

    1. 192.168.0.1:9876;192.168.0.2:9876

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

  1. 10.232.22.67 jmenv.taobao.net

推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。

客户端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其他参数同理。

ClientConfig配置

名称描述参数类型默认值有效值重要性
namesrvAddrNameServer的地址列表String从-D系统参数rocketmq.namesrv.addr或环境变量。NAMESRV_ADDR
instanceName客户端实例名称String从-D系统参数rocketmq.client.name获取,否则就是DEFAULT
clientIP客户端IPStringRemotingUtil.getLocalAddress()
namespace客户端命名空间String
accessChannel设置访问通道AccessChannelLOCAL
clientCallbackExecutorThreads客户端通信层接收到网络请求的时候,处理器的核数intRuntime.getRuntime().availableProcessors()
pollNameServerInterval轮询从NameServer获取路由信息的时间间隔int30000,单位毫秒
heartbeatBrokerInterval定期发送注册心跳到broker的间隔int30000,单位毫秒
persistConsumerOffsetInterval作用于Consumer,持久化消费进度的间隔int默认值5000,单位毫秒
pullTimeDelayMillsWhenException拉取消息出现异常的延迟时间设置long1000,单位毫秒
unitName单位名称String
unitMode单位模式booleanfalse
vipChannelEnabled是否启用vip netty通道以发送消息boolean从-D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true
useTLS是否使用安全传输。boolean从-D系统参数tls.enable获取,否则就是false
mqClientApiTimeoutmq客户端api超时设置int3000,单位毫秒
language客户端实现语言LanguageCodeLanguageCode.JAVA

DefaultMQProducer配置

名称描述参数类型默认值有效值重要性
producerGroup生产组的名称,一类Producer的标识StringDEFAULT_PRODUCER
createTopicKey发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值StringTopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC
defaultTopicQueueNums自动创建topic的话,默认queue数量是多少int4
sendMsgTimeout默认的发送超时时间int3000,单位毫秒
compressMsgBodyOverHowmuc消息body需要压缩的阈值int1024 * 4,4K
retryTimesWhenSendFailed同步发送失败的话,rocketmq内部重试多少次int2
retryTimesWhenSendAsyncFailed异步发送失败的话,rocketmq内部重试多少次int2
retryAnotherBrokerWhenNotStoreOK发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发booleanfalse
maxMessageSize客户端验证,允许发送的最大消息体大小int1024 1024 4,4M
traceDispatcher异步传输数据接口TraceDispatchernull

DefaultMQPushConsumer配置

名称描述参数类型默认值有效值重要性
consumerGroup消费组的名称,用于标识一类消费者String
messageModel消费模式MessageModelMessageModel.CLUSTERINGallocateMessageQueueStrategyCLUSTERING(集群消費模式) / ROADCASTING (广播消费模式)
consumeFromWhere启动消费点策略ConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestampCONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费String半小时前
allocateMessageQueueStrategy负载均衡策略算法AllocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)
subscription订阅关系Map<String, String>{}
messageListener消息处理监听器(回调)MessageListenernull
offsetStore消息消费进度存储器OffsetStorenull不建议设置,offsetStore 有两个策略:LocalFileOffsetStore 和RemoteBrokerOffsetStore.若沒有显示设置的情況下,广播模式將使用LocalFileOffsetStore,集群模式將使用RemoteBrokerOffsetStore,不建议修改.
consumeThreadMin消费线程池的core sizeint20
consumeThreadMax消费线程池的max sizeint64
adjustThreadPoolNumsThreshold动态扩线程核数的消费堆积阈值long100000
consumeConcurrentlyMaxSpan并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控int2000pullInterval
pullThresholdForQueueconsume queue流控的阈值int100
pullInterval拉取的间隔long0,单位毫秒
pullThresholdForTopic主题级别的流控制阈值int-1
pullThresholdSizeForTopic限制主题级别的缓存消息大小int-1
pullBatchSize一次最大拉取的批量大小int32
consumeMessageBatchMaxSize批量消费的最大消息条数int1
postSubscriptionWhenPull每次拉取的时候是否更新订阅关系booleanfalse
unitMode订阅组的单位booleanfalse
maxReconsumeTimes一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列int-1由于PullConsumer没有管理消费的线程池和管理器,需要用户自己处理各种消费结果和拉取结果,故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack.回传消息的时候会带上maxReconsumeTimes的值,broker发现此消息已经消费超过此值,则投递到死信队列,否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的,只是PushConsumer无需用户显示调用.
suspendCurrentQueueTimeMillis串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔long1000
consumeTimeout消费的最长超时时间long15,单位分钟
awaitTerminationMillisWhenShutdown关闭使用者时等待消息的最长时间,0表示无等待。long0
traceDispatcher异步传输数据接口TraceDispatchernull
registerTopics消費者需要監聽的topicCollection默認值:空集合

DefaultLitePullConsumer配置

名称描述参数类型默认值有效值重要性
consumerGroup消费组的名称,用于标识一类消费者String
brokerSuspendMaxTimeMillisbroker在长轮询下,连接最长挂起的时间long20000,单位毫秒
consumerTimeoutMillisWhenSuspendbroker在长轮询下,客户端等待broker响应的最长等待超时时间long30000,单位毫秒
consumerPullTimeoutMillispull的socket 超时时间long10000,单位毫秒
messageModel消费模式MessageModelMessageModel.CLUSTERING
messageQueueListener负载均衡consume queue分配变化的通知监听器MessageQueueListener
offsetStore消息消费进度存储器OffsetStore
allocateMessageQueueStrategy负载均衡策略算法AllocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)
unitMode订阅组的单位设置booleanfalse
autoCommit自动提交偏移的标志设置booleantrue
pullThreadNums拉取线程数设置int20
MIN_AUTOCOMMIT_INTERVAL_MILLIS最小提交偏移间隔时间long1000,单位为毫秒
autoCommitIntervalMillis最大提交偏移间隔时间long5000,单位为毫秒
pullBatchSize每次拉出的信息的最大数量long10
pullThresholdForAll消耗请求的流量控制阈值int10000
consumeMaxSpan消耗最大跨度偏移量int2000
pullThresholdForQueue队列级别的流量控制阈值int1000
pullThresholdSizeForQueue队列级别上限制缓存的消息大小int100MiB
pollTimeoutMillis轮询超时设置long5000,以毫秒为单位
topicMetadataCheckIntervalMillis检查主题元数据变化的间隔时间long30000,单位为毫秒
consumeFromWhere消费方式设置ConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestamp回溯消费时间String默认回溯消耗时间为半小时前
traceDispatcher异步传输数据的接口TraceDispatchernull
enableMsgTrace信息跟踪的标志booleanfalse
customizedTraceTopic消息跟踪主题的名称String