33、消息传递
Spring Framework 为消息传递系统集成提供了广泛的支持,从使用 JmsTemplate
简化 JMS API 的使用到异步接收消息的完整基础设施。Spring AMQP 为高级消息队列协议(Advanced Message Queuing Protocol,AMQP)提供了类似的功能集合。Spring Boot 还为 RabbitTemplate
和 RabbitMQ 提供自动配置选项。Spring WebSocket 本身包含了对 STOMP 消息传递的支持,Spring Boot 通过 starter 和少量自动配置即可支持它。Spring Boot 同样支持 Apache Kafka。
33.1、JMS
javax.jms.ConnectionFactory
接口提供了一种创建 javax.jms.Connection
的标准方法,可与 JMS broker(代理)进行交互。虽然 Spring 需要一个 ConnectionFactory
来与 JMS 一同工作,但是您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。)Spring Boot 还会自动配置发送和接收消息所需的基础设施。
33.1.1、ActiveMQ 支持
当 ActiveMQ 在 classpath 上可用时,Spring Boot 也可以配置一个 ConnectionFactory
。如果 broker 存在,则会自动启动并配置一个内嵌式 broker(前提是未通过配置指定 broder URL)。
注意
如果使用
spring-boot-starter-activemq
,则提供了连接到 ActiveMQ 实例必须依赖或内嵌一个 ActiveMQ 实例,以及与 JMS 集成的 Spring 基础设施。
ActiveMQ 配置由 spring.activemq.*
中的外部配置属性控制。例如,您可以在 application.properties
中声明以下部分:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
默认情况下,CachingConnectionFactory
将原生的 ConnectionFactory
使用可由 spring.jms.*
中的外部配置属性控制的合理设置包装起来:
spring.jms.cache.session-cache-size=5
如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms
添加一个依赖并相应地配置 JmsPoolConnectionFactory
来实现,如下所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
提示
有关更多支持的选项,请参阅
ActiveMQProperties
。您还可以注册多个实现了ActiveMQConnectionFactoryCustomizer
的的 bean,以进行更高级的自定义。
默认情况下,ActiveMQ 会创建一个 destination(目标)(如果它尚不存在),以便根据提供的名称解析 destination。
33.1.2、Artemis 支持
Spring Boot 可以在检测到 Artemis 在 classpath 上可用时自动配置一个 ConnectionFactory
。如果存在 broker,则会自动启动并配置一个内嵌 broker(除非已明确设置 mode 属性)。支持的 mode 为 embedded
(明确表示需要一个内嵌 broker,如果 broker 在 classpath 上不可用则发生错误)和 native
(使用 netty 传输协议连接到 broker)。配置后者后,Spring Boot 会使用默认设置配置一个 ConnectionFactory
,该 ConnectionFactory
连接到在本地计算机上运行的 broker。
注意
如果使用了
spring-boot-starter-artemis
,则会提供连接到现有的 Artemis 实例的必须依赖,以及与 JMS 集成的Spring 基础设施。将org.apache.activemq:artemis-jms-server
添加到您的应用程序可让您使用内嵌模式。
Artemis 配置由 spring.artemis.*
中的外部配置属性控制。例如,您可以在 application.properties
中声明以下部分:
spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret
内嵌 broker 时,您可以选择是否要启用持久化并列出应该可用的 destination。可以将这些指定为以逗号分隔的列表,以使用默认选项创建它们,也可以定义类型为 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或 org.apache.activemq.artemis.jms.server.config.TopicConfiguration
的 bean,分别用于高级队列和 topic(主题)配置。
默认情况下,CachingConnectionFactory
将原生的 ConnectionFactory
使用可由 spring.jms.*
中的外部配置属性控制的合理设置包装起来:
spring.jms.cache.session-cache-size=5
如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms
添加一个依赖并相应地配置 JmsPoolConnectionFactory
来实现,如下所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
有关更多支持的选项,请参阅 ArtemisProperties
。
不涉及 JNDI 查找,使用 Artemis 配置中的 name
属性或通过配置提供的名称来解析目标(destination)名称。
33.1.3、使用 JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试使用 JNDI 找到 JMS ConnectionFactory
。默认情况下,将检查 java:/JmsXA
和 java:/XAConnectionFactory
这两个位置。如果需要指定其他位置,可以使用 spring.jms.jndi-name
属性,如下所示:
spring.jms.jndi-name=java:/MyConnectionFactory
33.1.4、发送消息
Spring 的 JmsTemplate
是自动配置的,你可以直接将它注入到你自己的 bean 中,如下所示:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
@Autowired
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
}
注意
JmsMessagingTemplate
可以以类似的方式注入。如果定义了DestinationResolver
或MessageConverter
bean,它将自动关联到自动配置的JmsTemplate
。
33.1.5、接收消息
当存在 JMS 基础设施时,可以使用 @JmsListener
对任何 bean 进行注解以创建监听器(listener)端点。如果未定义 JmsListenerContainerFactory
,则会自动配置一个默认的(factory)。如果定义了 DestinationResolver
或 MessageConverter
bean,它将自动关联到默认的 factory。
默认情况下,默认 factory 是具有事务特性的。如果您在存在有 JtaTransactionManager
的基础设施中运行,则默认情况下它与监听器容器相关联。如果不是,则 sessionTransacted
flag 将为启用(enabled)。在后一种情况下,您可以通过在监听器方法(或其委托)上添加 @Transactional
,将本地数据存储事务与传入消息的处理相关联。这确保了在本地事务完成后传入消息能被告知。这还包括了发送已在同一 JMS 会话上执行的响应消息。
以下组件在 someQueue
destination 上创建一个监听器端点:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
提示
有关更多详细信息,请参阅
@EnableJms
的 Javadoc。
如果需要创建更多 JmsListenerContainerFactory
实例或覆盖缺省值,Spring Boot 会提供一个 DefaultJmsListenerContainerFactoryConfigurer
,您可以使用它来初始化 DefaultJmsListenerContainerFactory
,其设置与自动配置的 factory 设置相同。
例如,以下示例暴露了另一个使用特定 MessageConverter
的 factory:
@Configuration
static class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory());
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后,您可以在任何 @JmsListener
注解的方法中使用该 factory,如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
33.2、AMQP
高级消息队列协议(Advanced Message Queuing Protocol,AMQP)是一个平台无关,面向消息中间件的连接级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了一些快捷方法,包括 spring-boot-starter-amqp
starter。
33.2.1、RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 配置由 spring.rabbitmq.*
中的外部配置属性控制。例如,您可以在 application.properties
中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
如果上下文中存在 ConnectionNameStrategy
bean,它将自动用于命名由自动配置的 ConnectionFactory
所创建的连接。有关更多支持的选项,请参阅 RabbitProperties。
提示
有关详细信息,请参阅理解 AMQP、RabbitMQ 使用的协议。
33.2.2、发送消息
Spring 的 AmqpTemplate
和 AmqpAdmin
是自动配置的,您可以将它们直接注入自己的 bean 中,如下所示:
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
}
注意
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了
MessageConverter
bean,它将自动关联到自动配置的AmqpTemplate
。
如有必要,所有定义为 bean 的 org.springframework.amqp.core.Queue
都会自动在 RabbitMQ 实例上声明相应的队列。
要重试操作,可以在 AmqpTemplate
上启用重试(例如,在 broker 连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
默认情况下禁用重试。您还可以通过声明 RabbitRetryTemplateCustomizer
bean 以编程方式自定义 RetryTemplate
。
33.2.3、接收消息
当 Rabbit 基础设施存在时,可以使用 @RabbitListener
注解任何 bean 以创建监听器端点。如果未定义 RabbitListenerContainerFactory
,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory
,您可以使用 spring.rabbitmq.listener.type
属性切换到一个直接容器。如果定义了 MessageConverter
或 MessageRecoverer
bean,它将自动与默认 factory 关联。
以下示例组件在 someQueue
队列上创建一个侦听监听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
提示
有关更多详细信息,请参阅
@EnableRabbit
的 Javadoc。
如果需要创建更多 RabbitListenerContainerFactory
实例或覆盖缺省值,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer
和一个 DirectRabbitListenerContainerFactoryConfigurer
,您可以使用它来初始化 SimpleRabbitListenerContainerFactory
和 DirectRabbitListenerContainerFactory
,其设置与使用自动配置的 factory 相同。
提示
这两个 bean 与您选择的容器类型没有关系,它们通过自动配置暴露。
例如,以下配置类暴露了另一个使用特定 MessageConverter
的 factory:
@Configuration
static class RabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后,您可以在任何 @RabbitListener
注解的方法中使用该 factory,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
您可以启用重试机制来处理监听器的异常抛出情况。默认情况下使用 RejectAndDontRequeueRecoverer
,但您可以定义自己的 MessageRecoverer
。如果 broker 配置了重试机制,当重试次数耗尽时,则拒绝消息并将其丢弃或路由到死信(dead-letter)exchange 中。默认情况下重试机制为禁用。您还可以通过声明 RabbitRetryTemplateCustomizer
bean 以编程方式自定义 RetryTemplate
。
重要
默认情况下,如果禁用重试并且监听器异常抛出,则会无限期地重试传递。您可以通过两种方式修改此行为:将
defaultRequeueRejected
属性设置为false
,以便尝试零重传或抛出AmqpRejectAndDontRequeueException
以指示拒绝该消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。
33.3、Apache Kafka 支持
通过提供 spring-kafka
项目的自动配置来支持 Apache Kafka。
Kafka 配置由 spring.kafka.*
中的外部配置属性控制。例如,您可以在 application.properties
中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
提示
要在启动时创建主题(topic),请添加
NewTopic
类型的 Bean。如果主题已存在,则忽略该 bean。
有关更多支持的选项,请参阅 KafkaProperties
。
33.3.1、发送消息
Spring 的 KafkaTemplate
是自动配置的,您可以直接在自己的 bean 中装配它,如下所示:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
注意
如果定义了属性
spring.kafka.producer.transaction-id-prefix
,则会自动配置一个KafkaTransactionManager
。此外,如果定义了RecordMessageConverter
bean,它将自动关联到自动配置的KafkaTemplate
。
33.3.2、接收消息
当存在 Apache Kafka 基础设施时,可以使用 @KafkaListener
注解任何 bean 以创监听器端点。如果未定义 KafkaListenerContainerFactory
,则会使用 spring.kafka.listener.*
中定义的 key 自动配置一个默认的 factory。
以下组件在 someTopic
topic 上创建一个监听器端点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
如果定义了 KafkaTransactionManager
bean,它将自动关联到容器 factory。同样,如果定义了 RecordMessageConverter
、ErrorHandler
或 AfterRollbackProcessor
bean,它将自动关联到默认的 factory。
提示
自定义
ChainedKafkaTransactionManager
必须标记为@Primary
,因为它通常引用自动配置的KafkaTransactionManager
bean。
33.3.3、Kafka Stream
Spring for Apache Kafka 提供了一个工厂 bean 来创建 StreamsBuilder
对象并管理其 stream(流)的生命周期。只要 kafka-streams
在 classpath 上并且通过 @EnableKafkaStreams
注解启用了 Kafka Stream,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration
bean。
启用 Kafka Stream 意味着必须设置应用程序 id 和引导服务器(bootstrap server)。可以使用 spring.kafka.streams.application-id
配置前者,如果未设置则默认为 spring.application.name
。后者可以全局设置或专门为 stream 而重写。
使用专用 properties 可以设置多个其他属性,可以使用 spring.kafka.streams.properties
命名空间设置其他任意 Kafka 属性。有关更多信息,另请参见第 33.3.4 节:其他 Kafka 属性。
要使用 factory bean,只需将 StreamsBuilder
装配到您的 @Bean
中,如下所示:
@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
}
默认情况下,由其创建的 StreamBuilder
对象管理的流会自动启动。您可以使用 spring.kafka.streams.auto-startup
属性自定义此行为。
33.3.4、其他 Kafka 属性
自动配置支持的属性可在附录 A、常见应用程序属性中找到。请注意,在大多数情况下,这些属性(连接符或驼峰命名)直接映射到 Apache Kafka 点连形式属性。有关详细信息,请参阅 Apache Kafka 文档。
这些属性中的前几个适用于所有组件(生产者【producer】、使用者【consumer】、管理者【admin】和流【stream】),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka 重要性(优先级)属性设定为 HIGH、MEDIUM 或 LOW。Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选择的 MEDIUM 和 LOW 属性,以及所有没有默认值的属性。
只有 Kafka 支持的属性的子集可以直接通过 KafkaProperties
类获得。如果您希望使用不受支持的其他属性配置生产者或消费者,请使用以下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
这将常见的 prop.one
Kafka 属性设置为 first
(适用于生产者、消费者和管理者),prop.two
管理者属性为 second
,prop.three
消费者属性为 third
,prop.four
生产者属性为 fourth
,prop.five
流属性为 fifth
。
您还可以按如下方式配置 Spring Kafka JsonDeserializer
:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
同样,您可以禁用 JsonSerializer
在 header 中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
重要
以这种方式设置的属性将覆盖 Spring Boot 明确支持的任何配置项。