33、消息传递

Spring Framework 为消息传递系统集成提供了广泛的支持,从使用 JmsTemplate 简化 JMS API 的使用到异步接收消息的完整基础设施。Spring AMQP 为高级消息队列协议(Advanced Message Queuing Protocol,AMQP)提供了类似的功能集合。Spring Boot 还为 RabbitTemplate 和 RabbitMQ 提供自动配置选项。Spring WebSocket 本身包含了对 STOMP 消息传递的支持,Spring Boot 通过 starter 和少量自动配置即可支持它。Spring Boot 同样支持 Apache Kafka。

33.1、JMS

javax.jms.ConnectionFactory 接口提供了一种创建 javax.jms.Connection 的标准方法,可与 JMS broker(代理)进行交互。虽然 Spring 需要一个 ConnectionFactory 来与 JMS 一同工作,但是您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。)Spring Boot 还会自动配置发送和接收消息所需的基础设施。

33.1.1、ActiveMQ 支持

ActiveMQ 在 classpath 上可用时,Spring Boot 也可以配置一个 ConnectionFactory。如果 broker 存在,则会自动启动并配置一个内嵌式 broker(前提是未通过配置指定 broder URL)。

注意

如果使用 spring-boot-starter-activemq,则提供了连接到 ActiveMQ 实例必须依赖或内嵌一个 ActiveMQ 实例,以及与 JMS 集成的 Spring 基础设施。

ActiveMQ 配置由 spring.activemq.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

  1. spring.activemq.broker-url=tcp://192.168.1.210:9876
  2. spring.activemq.user=admin
  3. spring.activemq.password=secret

默认情况下,CachingConnectionFactory 将原生的 ConnectionFactory 使用可由 spring.jms.* 中的外部配置属性控制的合理设置包装起来:

  1. spring.jms.cache.session-cache-size=5

如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms 添加一个依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下所示:

  1. spring.activemq.pool.enabled=true
  2. spring.activemq.pool.max-connections=50

提示

有关更多支持的选项,请参阅 ActiveMQProperties。您还可以注册多个实现了 ActiveMQConnectionFactoryCustomizer 的的 bean,以进行更高级的自定义。

默认情况下,ActiveMQ 会创建一个 destination(目标)(如果它尚不存在),以便根据提供的名称解析 destination。

33.1.2、Artemis 支持

Spring Boot 可以在检测到 Artemis 在 classpath 上可用时自动配置一个 ConnectionFactory。如果存在 broker,则会自动启动并配置一个内嵌 broker(除非已明确设置 mode 属性)。支持的 mode 为 embedded(明确表示需要一个内嵌 broker,如果 broker 在 classpath 上不可用则发生错误)和 native(使用 netty 传输协议连接到 broker)。配置后者后,Spring Boot 会使用默认设置配置一个 ConnectionFactory,该 ConnectionFactory 连接到在本地计算机上运行的 broker。

注意

如果使用了 spring-boot-starter-artemis,则会提供连接到现有的 Artemis 实例的必须依赖,以及与 JMS 集成的Spring 基础设施。将 org.apache.activemq:artemis-jms-server 添加到您的应用程序可让您使用内嵌模式。

Artemis 配置由 spring.artemis.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

  1. spring.artemis.mode=native
  2. spring.artemis.host=192.168.1.210
  3. spring.artemis.port=9876
  4. spring.artemis.user=admin
  5. spring.artemis.password=secret

内嵌 broker 时,您可以选择是否要启用持久化并列出应该可用的 destination。可以将这些指定为以逗号分隔的列表,以使用默认选项创建它们,也可以定义类型为 org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration 的 bean,分别用于高级队列和 topic(主题)配置。

默认情况下,CachingConnectionFactory 将原生的 ConnectionFactory 使用可由 spring.jms.* 中的外部配置属性控制的合理设置包装起来:

  1. spring.jms.cache.session-cache-size=5

如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms 添加一个依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下所示:

  1. spring.artemis.pool.enabled=true
  2. spring.artemis.pool.max-connections=50

有关更多支持的选项,请参阅 ArtemisProperties

不涉及 JNDI 查找,使用 Artemis 配置中的 name 属性或通过配置提供的名称来解析目标(destination)名称。

33.1.3、使用 JNDI ConnectionFactory

如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试使用 JNDI 找到 JMS ConnectionFactory。默认情况下,将检查 java:/JmsXAjava:/XAConnectionFactory 这两个位置。如果需要指定其他位置,可以使用 spring.jms.jndi-name 属性,如下所示:

  1. spring.jms.jndi-name=java:/MyConnectionFactory

33.1.4、发送消息

Spring 的 JmsTemplate 是自动配置的,你可以直接将它注入到你自己的 bean 中,如下所示:

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.jms.core.JmsTemplate;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class MyBean {
  6. private final JmsTemplate jmsTemplate;
  7. @Autowired
  8. public MyBean(JmsTemplate jmsTemplate) {
  9. this.jmsTemplate = jmsTemplate;
  10. }
  11. // ...
  12. }

注意

JmsMessagingTemplate 可以以类似的方式注入。如果定义了 DestinationResolverMessageConverter bean,它将自动关联到自动配置的 JmsTemplate

33.1.5、接收消息

当存在 JMS 基础设施时,可以使用 @JmsListener 对任何 bean 进行注解以创建监听器(listener)端点。如果未定义 JmsListenerContainerFactory,则会自动配置一个默认的(factory)。如果定义了 DestinationResolverMessageConverter bean,它将自动关联到默认的 factory。

默认情况下,默认 factory 是具有事务特性的。如果您在存在有 JtaTransactionManager 的基础设施中运行,则默认情况下它与监听器容器相关联。如果不是,则 sessionTransacted flag 将为启用(enabled)。在后一种情况下,您可以通过在监听器方法(或其委托)上添加 @Transactional,将本地数据存储事务与传入消息的处理相关联。这确保了在本地事务完成后传入消息能被告知。这还包括了发送已在同一 JMS 会话上执行的响应消息。

以下组件在 someQueue destination 上创建一个监听器端点:

  1. @Component
  2. public class MyBean {
  3. @JmsListener(destination = "someQueue")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

提示

有关更多详细信息,请参阅 @EnableJms 的 Javadoc

如果需要创建更多 JmsListenerContainerFactory 实例或覆盖缺省值,Spring Boot 会提供一个 DefaultJmsListenerContainerFactoryConfigurer,您可以使用它来初始化 DefaultJmsListenerContainerFactory,其设置与自动配置的 factory 设置相同。

例如,以下示例暴露了另一个使用特定 MessageConverter 的 factory:

  1. @Configuration
  2. static class JmsConfiguration {
  3. @Bean
  4. public DefaultJmsListenerContainerFactory myFactory(
  5. DefaultJmsListenerContainerFactoryConfigurer configurer) {
  6. DefaultJmsListenerContainerFactory factory =
  7. new DefaultJmsListenerContainerFactory();
  8. configurer.configure(factory, connectionFactory());
  9. factory.setMessageConverter(myMessageConverter());
  10. return factory;
  11. }
  12. }

然后,您可以在任何 @JmsListener 注解的方法中使用该 factory,如下所示:

  1. @Component
  2. public class MyBean {
  3. @JmsListener(destination = "someQueue", containerFactory="myFactory")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

33.2、AMQP

高级消息队列协议(Advanced Message Queuing Protocol,AMQP)是一个平台无关,面向消息中间件的连接级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了一些快捷方法,包括 spring-boot-starter-amqp starter。

33.2.1、RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 配置由 spring.rabbitmq.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=secret

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 ConnectionFactory 所创建的连接。有关更多支持的选项,请参阅 RabbitProperties

提示

有关详细信息,请参阅理解 AMQP、RabbitMQ 使用的协议

33.2.2、发送消息

Spring 的 AmqpTemplateAmqpAdmin 是自动配置的,您可以将它们直接注入自己的 bean 中,如下所示:

  1. import org.springframework.amqp.core.AmqpAdmin;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MyBean {
  7. private final AmqpAdmin amqpAdmin;
  8. private final AmqpTemplate amqpTemplate;
  9. @Autowired
  10. public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
  11. this.amqpAdmin = amqpAdmin;
  12. this.amqpTemplate = amqpTemplate;
  13. }
  14. // ...
  15. }

注意

RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter bean,它将自动关联到自动配置的 AmqpTemplate

如有必要,所有定义为 bean 的 org.springframework.amqp.core.Queue 都会自动在 RabbitMQ 实例上声明相应的队列。

要重试操作,可以在 AmqpTemplate 上启用重试(例如,在 broker 连接丢失的情况下):

  1. spring.rabbitmq.template.retry.enabled=true
  2. spring.rabbitmq.template.retry.initial-interval=2s

默认情况下禁用重试。您还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate

33.2.3、接收消息

当 Rabbit 基础设施存在时,可以使用 @RabbitListener 注解任何 bean 以创建监听器端点。如果未定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,您可以使用 spring.rabbitmq.listener.type 属性切换到一个直接容器。如果定义了 MessageConverterMessageRecoverer bean,它将自动与默认 factory 关联。

以下示例组件在 someQueue 队列上创建一个侦听监听器端点:

  1. @Component
  2. public class MyBean {
  3. @RabbitListener(queues = "someQueue")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

提示

有关更多详细信息,请参阅 @EnableRabbit 的 Javadoc

如果需要创建更多 RabbitListenerContainerFactory 实例或覆盖缺省值,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它来初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,其设置与使用自动配置的 factory 相同。

提示

这两个 bean 与您选择的容器类型没有关系,它们通过自动配置暴露。

例如,以下配置类暴露了另一个使用特定 MessageConverter 的 factory:

  1. @Configuration
  2. static class RabbitConfiguration {
  3. @Bean
  4. public SimpleRabbitListenerContainerFactory myFactory(
  5. SimpleRabbitListenerContainerFactoryConfigurer configurer) {
  6. SimpleRabbitListenerContainerFactory factory =
  7. new SimpleRabbitListenerContainerFactory();
  8. configurer.configure(factory, connectionFactory);
  9. factory.setMessageConverter(myMessageConverter());
  10. return factory;
  11. }
  12. }

然后,您可以在任何 @RabbitListener 注解的方法中使用该 factory,如下所示:

  1. @Component
  2. public class MyBean {
  3. @RabbitListener(queues = "someQueue", containerFactory="myFactory")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

您可以启用重试机制来处理监听器的异常抛出情况。默认情况下使用 RejectAndDontRequeueRecoverer,但您可以定义自己的 MessageRecoverer。如果 broker 配置了重试机制,当重试次数耗尽时,则拒绝消息并将其丢弃或路由到死信(dead-letter)exchange 中。默认情况下重试机制为禁用。您还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate

重要

默认情况下,如果禁用重试并且监听器异常抛出,则会无限期地重试传递。您可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,以便尝试零重传或抛出 AmqpRejectAndDontRequeueException 以指示拒绝该消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。

33.3、Apache Kafka 支持

通过提供 spring-kafka 项目的自动配置来支持 Apache Kafka

Kafka 配置由 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

  1. spring.kafka.bootstrap-servers=localhost:9092
  2. spring.kafka.consumer.group-id=myGroup

提示

要在启动时创建主题(topic),请添加 NewTopic 类型的 Bean。如果主题已存在,则忽略该 bean。

有关更多支持的选项,请参阅 KafkaProperties

33.3.1、发送消息

Spring 的 KafkaTemplate 是自动配置的,您可以直接在自己的 bean 中装配它,如下所示:

  1. @Component
  2. public class MyBean {
  3. private final KafkaTemplate kafkaTemplate;
  4. @Autowired
  5. public MyBean(KafkaTemplate kafkaTemplate) {
  6. this.kafkaTemplate = kafkaTemplate;
  7. }
  8. // ...
  9. }

注意

如果定义了属性 spring.kafka.producer.transaction-id-prefix,则会自动配置一个 KafkaTransactionManager。此外,如果定义了 RecordMessageConverter bean,它将自动关联到自动配置的 KafkaTemplate

33.3.2、接收消息

当存在 Apache Kafka 基础设施时,可以使用 @KafkaListener 注解任何 bean 以创监听器端点。如果未定义 KafkaListenerContainerFactory,则会使用 spring.kafka.listener.* 中定义的 key 自动配置一个默认的 factory。

以下组件在 someTopic topic 上创建一个监听器端点:

  1. @Component
  2. public class MyBean {
  3. @KafkaListener(topics = "someTopic")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

如果定义了 KafkaTransactionManager bean,它将自动关联到容器 factory。同样,如果定义了 RecordMessageConverterErrorHandlerAfterRollbackProcessor bean,它将自动关联到默认的 factory。

提示

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean。

33.3.3、Kafka Stream

Spring for Apache Kafka 提供了一个工厂 bean 来创建 StreamsBuilder 对象并管理其 stream(流)的生命周期。只要 kafka-streams 在 classpath 上并且通过 @EnableKafkaStreams 注解启用了 Kafka Stream,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration bean。

启用 Kafka Stream 意味着必须设置应用程序 id 和引导服务器(bootstrap server)。可以使用 spring.kafka.streams.application-id 配置前者,如果未设置则默认为 spring.application.name。后者可以全局设置或专门为 stream 而重写。

使用专用 properties 可以设置多个其他属性,可以使用 spring.kafka.streams.properties 命名空间设置其他任意 Kafka 属性。有关更多信息,另请参见第 33.3.4 节:其他 Kafka 属性

要使用 factory bean,只需将 StreamsBuilder 装配到您的 @Bean 中,如下所示:

  1. @Configuration
  2. @EnableKafkaStreams
  3. static class KafkaStreamsExampleConfiguration {
  4. @Bean
  5. public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
  6. KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
  7. stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
  8. Produced.with(Serdes.Integer(), new JsonSerde<>()));
  9. return stream;
  10. }
  11. }

默认情况下,由其创建的 StreamBuilder 对象管理的流会自动启动。您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

33.3.4、其他 Kafka 属性

自动配置支持的属性可在附录 A、常见应用程序属性中找到。请注意,在大多数情况下,这些属性(连接符或驼峰命名)直接映射到 Apache Kafka 点连形式属性。有关详细信息,请参阅 Apache Kafka 文档。

这些属性中的前几个适用于所有组件(生产者【producer】、使用者【consumer】、管理者【admin】和流【stream】),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka 重要性(优先级)属性设定为 HIGH、MEDIUM 或 LOW。Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选择的 MEDIUM 和 LOW 属性,以及所有没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过 KafkaProperties 类获得。如果您希望使用不受支持的其他属性配置生产者或消费者,请使用以下属性:

  1. spring.kafka.properties.prop.one=first
  2. spring.kafka.admin.properties.prop.two=second
  3. spring.kafka.consumer.properties.prop.three=third
  4. spring.kafka.producer.properties.prop.four=fourth
  5. spring.kafka.streams.properties.prop.five=fifth

这将常见的 prop.one Kafka 属性设置为 first(适用于生产者、消费者和管理者),prop.two 管理者属性为 secondprop.three 消费者属性为 thirdprop.four 生产者属性为 fourthprop.five 流属性为 fifth

您还可以按如下方式配置 Spring Kafka JsonDeserializer

  1. spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  2. spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
  3. spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

同样,您可以禁用 JsonSerializer 在 header 中发送类型信息的默认行为:

  1. spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  2. spring.kafka.producer.properties.spring.json.add.type.headers=false

重要

以这种方式设置的属性将覆盖 Spring Boot 明确支持的任何配置项。