4.1.5. 发送消息
发送消息时,您可以使用以下任一方法:
void send(Message message) throws AmqpException;void send(String routingKey, Message message) throws AmqpException;void send(String exchange, String routingKey, Message message) throws AmqpException;我们可以从前面清单中的最后一种方法开始讨论,因为它实际上是最明确的。 它允许在运行时提供 AMQP 交换名称(以及路由密钥)。 最后一个参数是负责实际创建消息实例的回调。 使用此方法发送消息的示例可能如下所示: 下面的示例演示如何使用该方法发送消息:send
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1", new Message("12.34".getBytes(), someProperties));如果您计划使用该模板实例大部分或所有时间发送到同一交易所,则可以在模板本身上设置属性。 在这种情况下,可以使用前面清单中的第二种方法。 以下示例在功能上等效于上一个示例:exchange
amqpTemplate.setExchange("marketData.topic");amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));如果在模板上同时设置了 和 属性,则可以使用仅接受 . 以下示例演示如何执行此操作:exchangeroutingKeyMessage
amqpTemplate.setExchange("marketData.topic");amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");amqpTemplate.send(new Message("12.34".getBytes(), someProperties));考虑交换和路由键属性的更好方法是显式方法参数始终覆盖模板的默认值。 事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。 在这两种情况下,默认值都是空的,但这实际上是一个明智的默认值。 就路由密钥而言,首先并不总是必要的(例如,对于 交换)。 此外,队列可能绑定到具有空 . 这两种情况都是依赖于模板路由密钥属性的默认空值的合法方案。 就交换名称而言,通常使用空,因为AMQP规范将“默认交换”定义为没有名称。 由于所有队列都自动绑定到该默认交换(即直接交换),因此使用它们的名称作为绑定值,因此前面清单中的第二种方法可用于通过默认交换向任何队列发送简单的点对点消息传递。 可以通过在运行时提供方法参数来提供队列名称作为 。 以下示例演示如何执行此操作:StringFanoutStringStringStringroutingKey
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchangetemplate.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));或者,您可以创建一个模板,该模板主要用于或专门发布到单个队列。 以下示例演示如何执行此操作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchangetemplate.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queuetemplate.send(new Message("Hello World".getBytes(), someProperties));消息生成器 API
从版本 1.3 开始,消息生成器 API 由 和 提供。 这些方法提供了一种方便的“流畅”方法来创建消息或消息属性。 以下示例显示了流畅的 API:MessageBuilderMessagePropertiesBuilder
Message message = MessageBuilder.withBody("foo".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();Message message = MessageBuilder.withBody("foo".getBytes()) .andProperties(props) .build();可以设置在消息属性上定义的每个属性。 其他方法包括 、、 和 。 每个属性设置方法都有一个变体。 如果存在缺省初始值,则该方法名为 。setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)set*IfAbsent()set*IfAbsentOrDefault()
提供了五种静态方法来创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) public static MessageBuilder withClonedBody(byte[] body) public static MessageBuilder withBody(byte[] body, int from, int to) public static MessageBuilder fromMessage(Message message) public static MessageBuilder fromClonedMessage(Message message)生成器创建的消息具有对参数的直接引用的正文。
构建器创建的消息具有一个正文,该主体是一个新数组,其中包含参数中的字节副本。
构建器创建的消息有一个正文,该主体是一个新数组,其中包含参数中的字节范围。 请参阅 Arrays.copyOfRange() 了解更多详情。
生成器创建的消息具有一个正文,该主体是对参数主体的直接引用。 参数的属性将复制到新对象。MessageProperties
生成器创建的消息有一个正文,该主体是一个新数组,其中包含参数正文的副本。 参数的属性将复制到新对象。MessageProperties
提供了三种静态方法来创建实例:MessagePropertiesBuilder
public static MessagePropertiesBuilder newInstance() public static MessagePropertiesBuilder fromProperties(MessageProperties properties) public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)使用默认值初始化新的消息属性对象。
生成器使用提供的属性对象进行初始化,并将返回这些属性对象。build()
参数的属性将复制到新对象。MessageProperties
通过 的实现,每个方法都有一个重载版本,该版本采用一个额外的对象。 启用发布者确认后,将在 AmqpTemplate 中所述的回调中返回此对象。 这允许发件人将确认(或)与发送的消息相关联。RabbitTemplateAmqpTemplatesend()CorrelationDataacknack
从版本 1.6.7 开始,引入了该接口,允许在转换消息后修改相关数据。 以下示例演示如何使用它:CorrelationAwareMessagePostProcessor
Message postProcessMessage(Message message, Correlation correlation);在版本 2.0 中,此接口已弃用。 该方法已移至,默认实现委托给 。MessagePostProcessorpostProcessMessage(Message message)
同样从版本 1.6.7 开始,提供了一个名为的新回调接口。 在所有实例(方法中提供以及中提供的实例)之后调用此函数。 实现可以更新或替换方法中提供的关联数据(如果有)。 和原始(如果有)作为参数提供。 下面的示例演示如何使用该方法:CorrelationDataPostProcessorMessagePostProcessorsend()setBeforePublishPostProcessors()send()MessageCorrelationDatapostProcess
CorrelationData postProcess(Message message, CorrelationData correlationData);发布者退货
当模板的属性为 时,返回的消息由 AmqpTemplate 中描述的回调提供。mandatorytrue
从版本 1.4 开始,支持 SpEL 属性,该属性针对每个请求消息作为根评估对象进行评估,解析为一个值。 可以在表达式中使用 Bean 引用(如 )。RabbitTemplatemandatoryExpressionboolean@myBean.isMandatory(#root)
发布者返回也可以由发送和接收操作在内部使用。 有关详细信息,请参阅回复超时。RabbitTemplate
配料
版本 1.4.2 引入了 . 这是 的子类,具有重写方法,该方法根据 . 只有当批处理完成时,消息才会发送到 RabbitMQ。 以下清单显示了接口定义:BatchingRabbitTemplateRabbitTemplatesendBatchingStrategyBatchingStrategy
public interface BatchingStrategy { MessageBatch addToBatch(String exchange, String routingKey, Message message); Date nextRelease(); Collection<MessageBatch> releaseBatches();}批处理数据保存在内存中。 如果发生系统故障,未发送的消息可能会丢失。
提供了 A。 它支持将消息发送到单个交换或路由密钥。 它具有以下属性:SimpleBatchingStrategy
- batchSize:发送前的批消息数。
- bufferLimit:批处理消息的最大大小。 如果超出,这将抢占 ,并导致发送部分批处理。batchSize
- timeout:当没有向批处理添加消息的新活动时,发送部分批处理的时间。
通过在每条嵌入消息前面加上四字节二进制长度来设置批处理的格式。 通过将消息属性设置为 来传达给接收系统。SimpleBatchingStrategyspringBatchFormatlengthHeader4
默认情况下,侦听器容器会自动对批处理消息进行取消批处理(通过使用消息标头)。 拒绝批处理中的任何消息会导致整个批处理被拒绝。springBatchFormat
但是,有关详细信息,请参阅批处理@RabbitListener。
4.1.6. 接收消息
消息接收总是比发送复杂一些。 有两种方法可以接收 . 更简单的选项是使用轮询方法调用一次轮询一个。 更复杂但更常见的方法是注册异步按需接收的侦听器。 我们将在接下来的两个小节中介绍每种方法的示例。MessageMessageMessages
轮询消费者
本身可用于轮询接收。 默认情况下,如果没有可用的消息,则立即返回。 没有阻塞。 从版本 1.5 开始,您可以设置 ,以毫秒为单位,接收方法最多阻止这么长时间,等待消息。 小于零的值表示无限期阻塞(或至少直到与代理的连接丢失)。 版本 1.6 引入了允许在每次调用时传递超时的方法变体。AmqpTemplateMessagenullreceiveTimeoutreceive
由于接收操作会为每条消息创建一个新消息,因此此技术实际上并不适合高容量环境。 考虑对这些用例使用异步使用者或零。QueueingConsumerreceiveTimeout
从版本 2.4.8 开始,使用非零超时时,可以指定传递到用于将使用者与通道相关联的方法中的参数。 例如:。basicConsumetemplate.addConsumerArg("x-priority", 10)
有四种简单的方法可用。 与发送端一样,有一种方法要求已设置默认队列属性 直接在模板本身上,并且有一个在运行时接受队列参数的方法。 版本 1.6 引入了变体,以接受每个请求的覆盖。 下面的清单显示了这四种方法的定义:receiveExchangetimeoutMillisreceiveTimeout
Message receive() throws AmqpException;Message receive(String queueName) throws AmqpException;Message receive(long timeoutMillis) throws AmqpException;Message receive(String queueName, long timeoutMillis) throws AmqpException;与发送消息的情况一样,具有一些用于接收 POJO 而不是实例的便捷方法,并且实现提供了一种自定义用于创建返回值的方法: 下面的清单显示了这些方法:AmqpTemplateMessageMessageConverterObject
Object receiveAndConvert() throws AmqpException;Object receiveAndConvert(String queueName) throws AmqpException;Object receiveAndConvert(long timeoutMillis) throws AmqpException;Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;从版本 2.0 开始,这些方法的变体采用额外的参数来转换复杂类型。 模板必须配置 . 有关详细信息,请参阅使用 RabbitTemplate 从消息转换。ParameterizedTypeReferenceSmartMessageConverter
与方法类似,从版本 1.3 开始,具有几种用于同步接收、处理和回复消息的便捷方法。 下面的清单显示了这些方法定义:sendAndReceiveAmqpTemplatereceiveAndReply
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException;<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException;<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException;<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;实现负责 和 阶段。 在大多数情况下,您应该只提供 的实现,以便为收到的消息执行某些业务逻辑,并根据需要生成回复对象或消息。 注意,a 可能会返回 。 在这种情况下,不会发送任何回复,其工作方式与该方法类似。 这允许将同一队列用于混合消息,其中一些消息可能不需要回复。AmqpTemplatereceivereplyReceiveAndReplyCallbackReceiveAndReplyCallbacknullreceiveAndReplyreceive
仅当提供的回调不是 的实例时,才应用自动消息(请求和回复)转换,该实例提供原始消息交换协定。ReceiveAndReplyMessageCallback
对于需要自定义逻辑在运行时根据收到的消息确定地址并从 . 默认情况下,请求消息中的信息用于路由回复。ReplyToAddressCallbackreplyToReceiveAndReplyCallbackreplyTo
以下清单显示了基于 POJO 的接收和回复的示例:
boolean received = this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() { public Invoice handle(Order order) { return processOrder(order); } });if (received) { log.info("We received an order!");}异步消费者
Spring AMQP 还通过使用注释支持带注释的侦听器端点,并提供一个开放的基础设施来以编程方式注册端点。 这是迄今为止设置异步使用者的最方便方法。 有关更多详细信息,请参阅注释驱动的侦听器终结点。@RabbitListener
预取默认值曾经是 1,这可能导致高效使用者的利用率不足。 从版本 2.0 开始,默认预取值现在为 250,这应该让使用者在最常见的场景中保持忙碌和 从而提高吞吐量。
尽管如此,在某些情况下,预取值应该很低:
- 对于大型消息,尤其是在处理速度较慢的情况下(消息可能会在客户端进程中增加大量内存)
- 当需要严格的消息排序时(在这种情况下,预取值应设置回 1)
- 其他特殊情况
此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例中的并发),您可能希望减少预取,以便在使用者之间更均匀地分配消息。
请参阅消息侦听器容器配置。
有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率的文章和这篇关于排队理论的文章。
消息侦听器
对于异步接收,涉及专用组件(而不是 )。 该组件是用于使用回调的容器。 我们将在本节后面讨论容器及其属性。 但是,首先,我们应该查看回调,因为这是应用程序代码与消息传递系统集成的地方。 回调有几个选项,从接口的实现开始,以下清单显示:MessageAmqpTemplateMessageMessageListener
public interface MessageListener { void onMessage(Message message);}如果您的回调逻辑出于任何原因依赖于 AMQP 通道实例,则可以改用 . 它看起来很相似,但有一个额外的参数。 以下清单显示了接口定义:ChannelAwareMessageListenerChannelAwareMessageListener
public interface ChannelAwareMessageListener { void onMessage(Message message, Channel channel) throws Exception;}在 2.1 版中,此接口从软件包移至 。o.s.amqp.rabbit.coreo.s.amqp.rabbit.listener.api
MessageListenerAdapter
如果您希望在应用程序逻辑和消息传递 API 之间保持更严格的分离,则可以依赖框架提供的适配器实现。 这通常称为“消息驱动的 POJO”支持。
1.5 版为 POJO 消息传递引入了更灵活的机制,即注释。 有关详细信息,请参阅注释驱动的侦听器终结点。@RabbitListener
使用适配器时,只需提供对适配器本身应调用的实例的引用。 以下示例演示如何执行此操作:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);listener.setDefaultListenerMethod("myMethod");您可以对适配器进行子类化,并提供一个实现,以根据消息动态选择不同的方法。 此方法有两个参数,后者是任何转换的结果。 默认情况下,配置 。 请参阅 SimpleMessageConverter,以获取有关其他可用转换器的详细信息和信息。getListenerMethodName()originalMessageextractedMessageSimpleMessageConverter
从版本 1.4.2 开始,原始消息具有 and 属性,可用于确定从中接收消息的队列。consumerQueueconsumerTag
从版本 1.5 开始,您可以配置使用者队列或标记到方法名称的映射,以动态选择要调用的方法。 如果映射中没有条目,我们将回退到默认的侦听器方法。 默认侦听器方法(如果未设置)为 。handleMessage
从 2.0 版开始,提供了便利。 下面的清单显示了 的定义:FunctionalInterfaceFunctionalInterface
@FunctionalInterfacepublic interface ReplyingMessageListener<T, R> { R handleMessage(T t);}此接口有助于使用 Java 8 lambda 方便地配置适配器,如以下示例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> { ... return result;}));从版本 2.2 开始,已弃用,而是引入了新版本。 新方法可帮助侦听器获取和参数执行更多操作,例如在手动确认模式下调用。 下面的清单显示了最基本的示例:buildListenerArguments(Object)buildListenerArguments(Object, Channel, Message)ChannelMessagechannel.basicReject(long, boolean)
public class ExtendedListenerAdapter extends MessageListenerAdapter { @Override protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) { return new Object[]{extractedMessage, channel, message}; }}现在,您可以配置为与需要接收“通道”和“消息”相同。 侦听器的参数应设置为返回,如以下侦听器示例所示:ExtendedListenerAdapterMessageListenerAdapterbuildListenerArguments(Object, Channel, Message)
public void handleMessage(Object object, Channel channel, Message message) throws IOException { ...}容器
现在您已经了解了 -listen 回调的各种选项,我们可以将注意力转向容器。 基本上,容器处理“主动”职责,以便侦听器回调可以保持被动。 容器是“生命周期”组件的一个示例。 它提供了启动和停止的方法。 配置容器时,实质上是弥合 AMQP 队列和实例之间的差距。 您必须提供对 和队列名称或队列实例的引用,该侦听器应从中使用这些消息。MessageMessageListenerConnectionFactory
在 2.0 版之前,有一个侦听器容器 . 现在有第二个容器,. 选择容器中介绍了容器之间的差异以及选择要使用的容器时可能应用的条件。SimpleMessageListenerContainerDirectMessageListenerContainer
下面的清单显示了最基本的示例,该示例通过使用 :SimpleMessageListenerContainer
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(rabbitConnectionFactory);container.setQueueNames("some.queue");container.setMessageListener(new MessageListenerAdapter(somePojo));作为“活动”组件,最常见的是创建具有 Bean 定义的侦听器容器,以便它可以在后台运行。 下面的示例演示了使用 XML 执行此操作的一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/></rabbit:listener-container>下面的清单显示了使用 XML 执行此操作的另一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/></rabbit:listener-container>前面的两个示例都创建了一个(请注意属性 — 它默认为 )。DirectMessageListenerContainertypesimple
或者,您可能更喜欢使用 Java 配置,它类似于前面的代码片段:
@Configurationpublic class ExampleAmqpConfiguration { @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean public CachingConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public MessageListener exampleListener() { return new MessageListener() { public void onMessage(Message message) { System.out.println("received: " + message); } }; }}消费者优先
从 RabbitMQ V3.2 开始,代理现在支持使用者优先级(请参阅将使用者优先级与 RabbitMQ 配合使用)。 这是通过在使用者上设置参数来实现的。 现在支持设置使用者参数,如以下示例所示:x-prioritySimpleMessageListenerContainer
container.setConsumerArguments(Collections.<String, Object> singletonMap("x-priority", Integer.valueOf(10)));为方便起见,命名空间提供元素的属性,如以下示例所示:prioritylistener
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" /></rabbit:listener-container>从版本 1.3 开始,您可以修改容器在运行时侦听的队列。 请参阅侦听器容器队列。
auto-delete队列
当容器配置为侦听队列、队列具有选项或在代理上配置生存时间策略时,代理会在容器停止时(即,当最后一个使用者被取消时)移除队列。 在版本 1.3 之前,由于缺少队列,无法重新启动容器。 仅当连接关闭或打开时自动重新声明队列等,这在容器停止和启动时不会发生。auto-deletex-expiresRabbitAdmin
从版本 1.3 开始,容器使用 a 在启动期间重新声明任何缺少的队列。RabbitAdmin
还可以将条件声明(请参阅条件声明)与管理员一起使用,以将队列声明推迟到容器启动。 以下示例演示如何执行此操作:auto-startup="false"
<rabbit:queue id="otherAnon" declared-by="containerAdmin" /><rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin"> <rabbit:bindings> <rabbit:binding queue="otherAnon" key="otherAnon" /> </rabbit:bindings></rabbit:direct-exchange><rabbit:listener-container id="container2" auto-startup="false"> <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" /></rabbit:listener-container><rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />在这种情况下,队列和交换由 声明,这样就不会在上下文初始化期间声明元素。 此外,出于同样的原因,容器未启动。 稍后启动容器时,它使用其对的引用来声明元素。containerAdminauto-startup="false"containerAdmin
批处理消息
批处理消息(由生产者创建)由侦听器容器自动取消批处理(使用消息标头)。 拒绝批处理中的任何消息会导致整个批处理被拒绝。 有关批处理的详细信息,请参阅批处理。springBatchFormat
从版本 2.2 开始,可用于在使用者端(生产者发送离散消息)创建批处理。SimpleMessageListenerContainer
设置容器属性以启用此功能。 还必须为 true,以便容器负责处理这两种类型的批处理。 实现或何时为真。 从版本 2.2.7 开始,和 可以取消批处理生产者创建的批处理为 . 有关将此功能与 一起使用的信息,请参阅批处理@RabbitListener。consumerBatchEnableddeBatchingEnabledBatchMessageListenerChannelAwareBatchMessageListenerconsumerBatchEnabledSimpleMessageListenerContainerDirectMessageListenerContainerList<Message>@RabbitListener
消费者活动
每当侦听器出现时,容器都会发布应用程序事件 (使用者)遇到某种故障。 该事件具有以下属性:ListenerContainerConsumerFailedEvent
- container:使用者遇到问题的侦听器容器。
- reason:失败的文本原因。
- fatal:指示故障是否致命的布尔值。 对于非致命异常,容器会尝试根据 或 (对于 ) 或 (对于 ) 重新启动使用者。recoveryIntervalrecoveryBackoffSimpleMessageListenerContainermonitorIntervalDirectMessageListenerContainer
- throwable:被抓住的那个。Throwable
这些事件可以通过实现来使用。ApplicationListener<ListenerContainerConsumerFailedEvent>
当大于 1 时,所有使用者都会发布系统范围的事件(如连接失败)。concurrentConsumers
如果使用者失败,因为默认情况下,如果其队列被独占使用以及发布事件,则会发出日志。 若要更改此日志记录行为,请在实例的属性中提供自定义。 另请参阅记录通道关闭事件。WARNConditionalExceptionLoggerSimpleMessageListenerContainerexclusiveConsumerExceptionLogger
致命错误始终记录在级别。 这是不可修改的。ERROR
其他几个事件在容器生命周期的不同阶段发布:
- AsyncConsumerStartedEvent:当使用者启动时。
- AsyncConsumerRestartedEvent:当使用者在发生故障后重新启动时 - 仅。SimpleMessageListenerContainer
- AsyncConsumerTerminatedEvent:消费者正常停止时。
- AsyncConsumerStoppedEvent:当消费者停止时 - 仅。SimpleMessageListenerContainer
- ConsumeOkEvent:当从代理接收到 a 时,包含队列名称和consumeOkconsumerTag
- ListenerContainerIdleEvent:请参阅检测空闲异步使用者。
- MissingQueueEvent:检测到缺少队列时。
消费者标签
您可以提供生成消费者标签的策略。 默认情况下,消费者标记由代理生成。 以下清单显示了接口定义:ConsumerTagStrategy
public interface ConsumerTagStrategy { String createConsumerTag(String queue);}队列可用,以便(可选)在标记中使用。
请参阅消息侦听器容器配置。
注释驱动的侦听器终结点
异步接收消息的最简单方法是使用带批注的侦听器终结点基础结构。 简而言之,它允许您将托管 Bean 的方法公开为 Rabbit 侦听器端点。 以下示例演示如何使用批注:@RabbitListener
@Componentpublic class MyService { @RabbitListener(queues = "myQueue") public void processOrder(String data) { ... }}前面示例的思想是,只要名为 的队列中有消息可用,就会相应地调用该方法(在本例中为消息的有效负载)。myQueueprocessOrder
带批注的终结点基础结构通过使用 .RabbitListenerContainerFactory
在前面的示例中,必须已存在并绑定到某个交换。 只要应用程序上下文中存在 ,就可以自动声明和绑定队列。myQueueRabbitAdmin
可以为注释属性 ( 等) 指定属性占位符 () 或 SpEL 表达式 ()。 有关为什么可能使用 SpEL 而不是属性占位符的示例,请参阅侦听多个队列。 下面的清单显示了如何声明 Rabbit 侦听器的三个示例:${some.property}#{someExpression}queues
@Componentpublic class MyService { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"), key = "orderRoutingKey") ) public void processOrder(Order order) { ... } @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "auto.exch"), key = "invoiceRoutingKey") ) public void processInvoice(Invoice invoice) { ... } @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true")) public String handleWithSimpleDeclare(String data) { ... }}在第一个示例中,如果需要,队列与交换一起自动声明(持久), 并使用路由密钥绑定到交换。 在第二个示例中,声明并绑定了匿名(独占、自动删除)队列;队列名称由框架使用 . 不能使用此技术声明以代理命名的队列;它们需要声明为 Bean 定义;请参阅容器和代理命名队列。 可以提供多个条目,让侦听器侦听多个队列。 在第三个示例中,如有必要,将使用队列名称作为路由键,声明具有从属性检索的名称的队列,并将默认绑定到默认交换。myQueueBase64UrlNamingStrategyQueueBindingmy.queue
从版本 2.0 开始,注释支持任何交换类型,包括自定义。 有关详细信息,请参阅 AMQP 概念。@Exchange
当您需要更高级的配置时,可以使用普通定义。@Bean
注意第一个示例中的交换。 例如,这允许绑定到可能具有不同设置的现有交换(例如 )。 默认情况下,现有交易所的属性必须匹配。ignoreDeclarationExceptionsinternal
从版本 2.0 开始,您现在可以将队列绑定到具有多个路由密钥的交易所,如以下示例所示:
... key = { "red", "yellow" }...您还可以在队列、交换、 和绑定,如以下示例所示:@QueueBinding
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "auto.headers", autoDelete = "true", arguments = @Argument(name = "x-message-ttl", value = "10000", type = "java.lang.Integer")), exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"), arguments = { @Argument(name = "x-match", value = "all"), @Argument(name = "thing1", value = "somevalue"), @Argument(name = "thing2") }))public String handleWithHeadersExchange(String foo) { ...}请注意,队列的参数设置为 10 秒。 由于参数类型不是 ,我们必须指定其类型 — 在本例中为 . 与所有此类声明一样,如果队列已存在,则参数必须与队列上的参数匹配。 对于标头交换,我们将绑定参数设置为匹配标头设置为 和 的消息 标头必须与任何值一起存在。 该参数意味着必须同时满足这两个条件。x-message-ttlStringIntegerthing1somevaluething2x-match
参数名称、值和类型可以是属性占位符 () 或 SpEL 表达式 ()。 必须解析为 . 表达式必须解析为类的完全限定名。 必须解析为可由 转换为类型的内容(如前面示例中的 )。${…}#{…}nameStringtypeClassvalueDefaultConversionServicex-message-ttl
如果名称解析为 或为空 ,则忽略。nullString@Argument
元注释
有时,您可能希望对多个侦听器使用相同的配置。 若要减少样板配置,可以使用元注释创建自己的侦听器注释。 以下示例演示如何执行此操作:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))public @interface MyAnonFanoutListener {}public class MetaListener { @MyAnonFanoutListener public void handle1(String foo) { ... } @MyAnonFanoutListener public void handle2(String foo) { ... }}在前面的示例中,注释创建的每个侦听器都绑定了一个匿名的自动删除 队列到扇出交换,。 从版本 2.2.3 开始,支持允许覆盖元注释注释的属性。 此外,用户注释现在可以是 ,允许为一个方法创建多个容器。@MyAnonFanoutListenermetaFanout@AliasFor@Repeatable
@Componentstatic class MetaAnnotationTestBean { @MyListener("queue1") @MyListener("queue2") public void handleIt(String body) { }}@RabbitListener@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Repeatable(MyListeners.class)static @interface MyListener { @AliasFor(annotation = RabbitListener.class, attribute = "queues") String[] value() default {};}@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)static @interface MyListeners { MyListener[] value();}启用侦听器终结点注释
若要启用对批注的支持,可以添加到其中一个类中。 以下示例演示如何执行此操作:@RabbitListener@EnableRabbit@Configuration
@Configuration@EnableRabbitpublic class AppConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setContainerCustomizer(container -> /* customize the container */); return factory; }}从版本 2.0 开始,a 也可用。 它创建实例。DirectMessageListenerContainerFactoryDirectMessageListenerContainer
有关帮助您在 和 之间进行选择的信息,请参阅选择容器。SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory
从版本 2.2.2 开始,您可以提供实现(如上所示)。 这可用于在创建和配置容器后进一步配置容器;例如,可以使用它来设置容器工厂未公开的属性。ContainerCustomizer
版本 2.4.8 提供了您希望应用多个定制器的情况。CompositeContainerCustomizer
默认情况下,基础结构查找名为源的 Bean,供工厂用于创建消息侦听器容器。 在这种情况下,忽略 RabbitMQ 基础结构设置,可以使用三个线程的核心轮询大小和十个线程的最大池大小来调用该方法。rabbitListenerContainerFactoryprocessOrder
您可以自定义要用于每个注释的侦听器容器工厂,也可以通过实现接口来配置显式默认值。 仅当至少注册了一个没有特定容器工厂的终结点时,才需要默认值。 有关完整的详细信息和示例,请参阅 Javadoc。RabbitListenerConfigurer
容器工厂提供用于添加在接收消息(调用侦听器之前)和发送回复之前应用的实例的方法。MessagePostProcessor
有关回复的信息,请参阅回复管理。
从版本 2.0.6 开始,可以将 和 添加到侦听器容器工厂。 它在发送回复时使用。 当重试次数用尽时调用。 您可以使用 从上下文中获取信息。 以下示例演示如何执行此操作:RetryTemplateRecoveryCallbackRecoveryCallbackSendRetryContextAccessor
factory.setRetryTemplate(retryTemplate);factory.setReplyRecoveryCallback(ctx -> { Message failed = SendRetryContextAccessor.getMessage(ctx); Address replyTo = SendRetryContextAccessor.getAddress(ctx); Throwable t = ctx.getLastThrowable(); ... return null;});如果您更喜欢 XML 配置,则可以使用该元素。 检测任何注释的豆子。<rabbit:annotation-driven>@RabbitListener
例如,您可以使用类似于以下内容的 XML:SimpleRabbitListenerContainer
<rabbit:annotation-driven/><bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="10"/></bean>例如,您可以使用类似于以下内容的 XML:DirectMessageListenerContainer
<rabbit:annotation-driven/><bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="consumersPerQueue" value="3"/></bean>从版本 2.0 开始,注释具有属性。 它支持 SpEL 表达式 () 和属性占位符 ()。 其含义和允许的值取决于容器类型,如下所示:@RabbitListenerconcurrency#{…}${…}
- 对于 ,该值必须是单个整数值,用于设置容器上的属性。DirectMessageListenerContainerconsumersPerQueue
- 对于 ,该值可以是单个整数值,用于设置容器上的属性,也可以具有 , 其中 是属性和属性的形式。SimpleRabbitListenerContainerconcurrentConsumersm-nmconcurrentConsumersnmaxConcurrentConsumers
在任一情况下,此设置都会覆盖出厂设置。 以前,如果您有需要不同并发的侦听器,则必须定义不同的容器工厂。
注释还允许通过 and(自 2.2 起)注释属性覆盖工厂和属性。 对每个执行程序使用不同的执行程序可能有助于识别与日志和线程转储中的每个侦听器关联的线程。autoStartuptaskExecutorautoStartupexecutor
版本 2.2 还添加了该属性,该属性允许您重写容器工厂的属性。ackModeacknowledgeMode
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")public void manual1(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { ... channel.basicAck(tag, false);}带批注方法的消息转换
在调用侦听器之前,管道中有两个转换步骤。 第一步使用 将传入的 Spring AMQP 转换为 Spring 消息传递。 调用目标方法时,如有必要,消息负载将转换为方法参数类型。MessageConverterMessageMessage
第一步的默认值是处理对象转换的 Spring AMQP。 所有其他保留为 . 在下面的讨论中,我们称之为“消息转换器”。MessageConverterSimpleMessageConverterStringjava.io.Serializablebyte[]
第二步的默认转换器是 ,它委托给转换服务 (的实例)。 在下面的讨论中,我们称之为“方法参数转换器”。GenericMessageConverterDefaultFormattingConversionService
若要更改消息转换器,可以将其作为属性添加到容器工厂 Bean 中。 以下示例演示如何执行此操作:
@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ... factory.setMessageConverter(new Jackson2JsonMessageConverter()); ... return factory;}这将配置一个 Jackson2 转换器,该转换器期望存在标头信息以指导转换。
您还可以使用 ,它可以处理不同内容类型的转换。ContentTypeDelegatingMessageConverter
从版本 2.3 开始,您可以通过在属性中指定 Bean 名称来覆盖工厂转换器。messageConverter
@Beanpublic Jackson2JsonMessageConverter jsonConverter() { return new Jackson2JsonMessageConverter();}@RabbitListener(..., messageConverter = "jsonConverter")public void listen(String in) { ...}这避免了仅仅为了更改转换器而声明不同的容器工厂。
在大多数情况下,没有必要自定义方法参数转换器,除非,例如,你想使用 一个习俗 .ConversionService
在 1.6 之前的版本中,转换 JSON 的类型信息必须在消息标头中提供,或者 需要定制。 从版本 1.6 开始,如果没有类型信息标头,则可以从目标推断出类型 方法参数。ClassMapper
此类型推理仅适用于方法级别。@RabbitListener
请参阅 Jackson2JsonMessageConverter 了解更多信息。
如果要自定义方法参数转换器,可以按如下方式执行此操作:
@Configuration@EnableRabbitpublic class AppConfig implements RabbitListenerConfigurer { ... @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new GenericMessageConverter(myConversionService())); return factory; } @Bean public DefaultConversionService myConversionService() { DefaultConversionService conv = new DefaultConversionService(); conv.addConverter(mySpecialConverter()); return conv; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } ...}对于多方法侦听器(请参阅多方法侦听器),方法选择基于消息转换后消息的有效负载。 只有在选择方法后,才会调用方法参数转换器。
将自定义添加到@RabbitListenerHandlerMethodArgumentResolver
从版本 2.3.7 开始,您可以添加自己的参数并解析自定义方法参数。 您所需要的只是实现和使用类中的方法。HandlerMethodArgumentResolverRabbitListenerConfigurersetCustomMethodArgumentResolvers()RabbitListenerEndpointRegistrar
@Configurationclass CustomRabbitConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setCustomMethodArgumentResolvers( new HandlerMethodArgumentResolver() { @Override public boolean supportsParameter(MethodParameter parameter) { return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType()); } @Override public Object resolveArgument(MethodParameter parameter, org.springframework.messaging.Message<?> message) { return new CustomMethodArgument( (String) message.getPayload(), message.getHeaders().get("customHeader", String.class) ); } } ); }}编程端点注册
RabbitListenerEndpoint提供 Rabbit 终结点的模型,并负责为该模型配置容器。 除了注释检测到的终结点之外,基础结构还允许您以编程方式配置终结点。 以下示例演示如何执行此操作:RabbitListener
@Configuration@EnableRabbitpublic class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setQueueNames("anotherQueue"); endpoint.setMessageListener(message -> { // processing }); registrar.registerEndpoint(endpoint); }}在前面的示例中,我们使用了 ,它提供了要调用的实际值,但您也可以构建自己的端点变体来描述自定义调用机制。SimpleRabbitListenerEndpointMessageListener
应该注意的是,您也可以完全跳过 的使用,并通过 以编程方式注册端点。@RabbitListenerRabbitListenerConfigurer
带注释的端点方法签名
到目前为止,我们一直在端点中注入一个简单的方法,但它实际上可以有一个非常灵活的方法签名。 下面的示例重写它以注入自定义标头:StringOrder
@Componentpublic class MyService { @RabbitListener(queues = "myQueue") public void processOrder(Order order, @Header("order_type") String orderType) { ... }}以下列表显示了可与侦听器终结点中的参数匹配的参数:
- 生的.org.springframework.amqp.core.Message
- 从原始.MessagePropertiesMessage
- 接收消息的 。com.rabbitmq.client.Channel
- 从传入的 AMQP 消息转换而来。org.springframework.messaging.Message
- @Header-带注释的方法参数,用于提取特定的标头值,包括标准 AMQP 标头。
- @Headers-带注释的参数,也必须可分配给该参数才能访问所有标头。java.util.Map
- 转换后的有效负载
不属于受支持类型(即、 和 )的非批注元素与有效负载匹配。 您可以通过使用 注释参数来明确这一点。 您还可以通过添加额外的 .MessageMessagePropertiesMessage<?>Channel@Payload@Valid
注入 Spring 的消息抽象的能力对于从存储在特定于传输的消息中的所有信息中受益特别有用,而无需依赖特定于传输的 API。 以下示例演示如何执行此操作:
@RabbitListener(queues = "myQueue")public void processOrder(Message<Order> order) { ...}方法参数的处理由 提供,您可以进一步自定义该处理以支持其他方法参数。 转换和验证支持也可以在那里定制。DefaultMessageHandlerMethodFactory
例如,如果我们想在处理之前确保我们的有效,我们可以注释有效负载并配置必要的验证器,如下所示:Order@Valid
@Configuration@EnableRabbitpublic class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setValidator(myValidator()); return factory; }}@RabbitListener @Payload验证
从版本 2.3.7 开始,现在可以更轻松地添加 a 以验证和参数。 现在,您只需将验证器添加到注册商本身即可。Validator@RabbitListener@RabbitHandler@Payload
@Configuration@EnableRabbitpublic class Config implements RabbitListenerConfigurer { ... @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); }}将 Spring 引导与验证启动器一起使用时,会自动配置 a:LocalValidatorFactoryBean
@Configuration@EnableRabbitpublic class Config implements RabbitListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); }}要验证:
public static class ValidatedClass { @Max(10) private int bar; public int getBar() { return this.bar; } public void setBar(int bar) { this.bar = bar; }}和
@RabbitListener(id="validated", queues = "queue1", errorHandler = "validationErrorHandler", containerFactory = "jsonListenerContainerFactory")public void validatedListener(@Payload @Valid ValidatedClass val) { ...}@Beanpublic RabbitListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... };}侦听多个队列
使用该属性时,可以指定关联的容器可以侦听多个队列。 您可以使用注释使接收消息的队列名称可用于 POJO 方法。 以下示例演示如何执行此操作:queues@Header
@Componentpublic class MyService { @RabbitListener(queues = { "queue1", "queue2" } ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... }}从版本 1.5 开始,可以使用属性占位符和 SpEL 外部化队列名称。 以下示例演示如何执行此操作:
@Componentpublic class MyService { @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... }}在版本 1.5 之前,只能以这种方式指定单个队列。 每个队列都需要一个单独的属性。
回复管理
中的现有支持已允许方法具有非 void 返回类型。 在这种情况下,调用的结果将封装在发送到原始消息标头中指定的地址的消息中,或发送到侦听器上配置的默认地址。 可以使用消息传递抽象的批注来设置该默认地址。MessageListenerAdapterReplyToAddress@SendTo
假设我们的方法现在应该返回一个,我们可以这样写它来自动发送回复:processOrderOrderStatus
@RabbitListener(destination = "myQueue")@SendTo("status")public OrderStatus processOrder(Order order) { // order processing return status;}如果需要以独立于传输的方式设置其他标头,则可以返回如下所示的内容:Message
@RabbitListener(destination = "myQueue")@SendTo("status")public Message<OrderStatus> processOrder(Order order) { // order processing return MessageBuilder .withPayload(status) .setHeader("code", 1234) .build();}或者,可以在容器工厂属性中使用 来添加更多标头。 从版本 2.2.3 开始,被调用的 bean/方法在回复消息中可用,可以在消息后处理器中使用,将信息传达回调用方:MessagePostProcessorbeforeSendReplyMessagePostProcessors
factory.setBeforeSendReplyPostProcessors(msg -> { msg.getMessageProperties().setHeader("calledBean", msg.getMessageProperties().getTargetBean().getClass().getSimpleName()); msg.getMessageProperties().setHeader("calledMethod", msg.getMessageProperties().getTargetMethod().getName()); return m;});从版本 2.2.5 开始,您可以配置 a 以在发送回复消息之前对其进行修改;在标头设置为与请求匹配后调用它。ReplyPostProcessorcorrelationId
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")public String capitalizeWithHeader(String in) { return in.toUpperCase();}@Beanpublic ReplyPostProcessor echoCustomHeader() { return (req, resp) -> { resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader")); return resp; };}从版本 3.0 开始,您可以在容器工厂而不是注释上配置后处理器。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> { resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader")); return resp;});参数是侦听器 ID。id
注释上的设置将取代出厂设置。
该值假定为遵循模式的回复和对, 其中可以省略其中一个部分。 有效值如下:@SendToexchangeroutingKeyexchange/routingKey
- thing1/thing2:交换和 .:交换和默认值(空)。 或 :和默认(空)交换。 或空:默认交换和默认 .replyToroutingKeything1/replyToroutingKeything2/thing2replyToroutingKey/replyToroutingKey
此外,您可以在没有属性的情况下使用。 这种情况等于空模式。 仅当入站消息没有属性时才使用。@SendTovaluesendTo@SendToreplyToAddress
从版本 1.5 开始,该值可以是 Bean 初始化 SpEL 表达式,如以下示例所示:@SendTo
@RabbitListener(queues = "test.sendTo.spel")@SendTo("#{spelReplyTo}")public String capitalizeWithSendToSpel(String foo) { return foo.toUpperCase();}...@Beanpublic String spelReplyTo() { return "test.sendTo.reply.spel";}表达式的计算结果必须为 ,可以是简单的队列名称(发送到默认交换)或 前面的示例之前讨论的窗体。Stringexchange/routingKey
表达式在初始化期间计算一次。#{…}
对于动态答复路由,邮件发件人应包含邮件属性或使用备用 运行时 SpEL 表达式(在下一个示例之后描述)。reply_to
从版本 1.6 开始,可以是在运行时根据请求计算的 SpEL 表达式 和回复,如以下示例所示:@SendTo
@RabbitListener(queues = "test.sendTo.spel")@SendTo("!{'some.reply.queue.with.' + result.queueName}")public Bar capitalizeWithSendToSpel(Foo foo) { return processTheFooAndReturnABar(foo);}SpEL 表达式的运行时性质用分隔符表示。 表达式的计算上下文对象具有三个属性:!{…}#root
- request:请求对象。o.s.amqp.core.Message
- source:转换后。o.s.messaging.Message<?>
- result:方法结果。
上下文具有映射属性访问器、标准类型转换器和 Bean 解析器,它允许其他 Bean 在 引用上下文(例如,)。@someBeanName.determineReplyQ(request, result)
总之,在初始化期间计算一次,对象是应用程序上下文。 豆类由其名称引用。 在运行时为每条消息进行评估,根对象具有前面列出的属性。 Bean 引用其名称,前缀为 。#{…}#root!{…}@
从版本 2.1 开始,还支持简单属性占位符(例如,)。 对于早期版本,可以使用以下内容作为解决方法,如以下示例所示:${some.reply.to}
@RabbitListener(queues = "foo")@SendTo("#{environment['my.send.to']}")public String listen(Message in) { ... return ...}回复内容类型
如果使用复杂的消息转换器(如 ),则可以通过设置侦听器上的属性来控制答复的内容类型。 这允许转换器为回复选择适当的委托转换器。ContentTypeDelegatingMessageConverterreplyContentType
@RabbitListener(queues = "q1", messageConverter = "delegating", replyContentType = "application/json")public Thing2 listen(Thing1 in) { ...}默认情况下,为了向后兼容,转换器设置的任何内容类型属性都将在转换后被此值覆盖。 诸如 之类的转换器使用回复类型而不是内容类型来确定所需的转换,并在回复消息中适当地设置内容类型。 这可能不是所需的操作,可以通过将属性设置为 来覆盖。 例如,如果返回包含 JSON,则会将回复中的内容类型设置为 。 以下配置将确保内容类型设置正确,即使使用了 也是如此。SimpleMessageConverterconverterWinsContentTypefalseStringSimpleMessageConvertertext/plainSimpleMessageConverter
@RabbitListener(queues = "q1", replyContentType = "application/json", converterWinsContentType = "false")public String listen(Thing in) { ... return someJsonString;}当返回类型为 Spring AMQP 或 Spring 消息传递时,这些属性(和)不适用。 在第一种情况下,不涉及转换;只需设置消息属性。 在第二种情况下,使用消息头控制行为:replyContentTypeconverterWinsContentTypeMessageMessage<?>contentType
@RabbitListener(queues = "q1", messageConverter = "delegating")@SendTo("q2")public Message<String> listen(String in) { ... return MessageBuilder.withPayload(in.toUpperCase()) .setHeader(MessageHeaders.CONTENT_TYPE, "application/xml") .build();}此内容类型将在 传递给转换器。 默认情况下,为了向后兼容,转换器设置的任何内容类型属性都将在转换后被此值覆盖。 如果要覆盖该行为,请同时设置 to,转换器设置的任何值都将保留。MessagePropertiesAmqpHeaders.CONTENT_TYPE_CONVERTER_WINStrue
多方法侦听器
从版本 1.5.0 开始,您可以在类级别指定注释。 与新的注释一起,这允许单个侦听器根据 传入消息的有效负载类型。 最好用一个例子来描述这一点:@RabbitListener@RabbitHandler
@RabbitListener(id="multi", queues = "someQueue")@SendTo("my.reply.queue")public class MultiListenerBean { @RabbitHandler public String thing2(Thing2 thing2) { ... } @RabbitHandler public String cat(Cat cat) { ... } @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) { ... } @RabbitHandler(isDefault = true) public String defaultMethod(Object object) { ... }}在这种情况下,如果转换后的有效负载为 、 或 。 您应该了解,系统必须能够根据有效负载类型识别唯一方法。 检查类型是否可分配给没有批注或使用批注的单个参数。 请注意,相同的方法签名适用,如方法级别(如前所述)中所述。@RabbitHandlerThing2CatHat@Payload@RabbitListener
从版本 2.0.3 开始,可以将方法指定为默认方法,如果其他方法不匹配,则会调用该方法。 最多可以指定一种方法。@RabbitHandler
@RabbitHandler仅用于在转换后处理消息有效负载,如果您希望接收未转换的原始对象,则必须在方法上使用,而不是在类上使用。Message@RabbitListener
@Repeatable @RabbitListener
从版本 1.6 开始,注释标有 . 这意味着注释可以多次出现在同一个带注释的元素(方法或类)上。 在这种情况下,将为每个注释创建一个单独的侦听器容器,每个注释调用相同的侦听器。 可重复的注释可用于 Java 8 或更高版本。@RabbitListener@Repeatable@Bean
代理和泛型@RabbitListener
如果您的服务旨在代理(例如,在 的情况下),您应该记住一些注意事项 接口具有泛型参数。 请考虑以下示例:@Transactional
interface TxService<P> { String handle(P payload, String header);}static class TxServiceImpl implements TxService<Foo> { @Override @RabbitListener(...) public String handle(Thing thing, String rk) { ... }}对于泛型接口和特定实现,您被迫切换到 CGLIB 目标类代理,因为接口方法的实际实现是桥接方法。 在事务管理的情况下,CGLIB 的使用是通过 注释选项:。 在这种情况下,必须在实现中的目标方法上声明所有注释,如以下示例所示:handle@EnableTransactionManagement(proxyTargetClass = true)
static class TxServiceImpl implements TxService<Foo> { @Override @Transactional @RabbitListener(...) public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) { ... }}处理异常
默认情况下,如果带批注的侦听器方法引发异常,则会将其抛出到容器,并且消息将重新排队并重新传递、丢弃或路由到死信交换,具体取决于容器和代理配置。 不会向发件人返回任何内容。
从版本 2.0 开始,注释有两个新属性:和 。@RabbitListenererrorHandlerreturnExceptions
默认情况下不配置这些。
您可以使用 来提供实现的 Bean 名称。 此功能接口具有一种方法,如下所示:errorHandlerRabbitListenerErrorHandler
@FunctionalInterfacepublic interface RabbitListenerErrorHandler { Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception;}如您所见,您可以访问从容器接收的原始消息、消息转换器生成的 Spring 消息传递对象以及侦听器引发的异常(包装在 中)。 错误处理程序可以返回一些结果(作为回复发送),也可以引发原始或新异常(抛出到容器或返回到发送方,具体取决于设置)。Message<?>ListenerExecutionFailedExceptionreturnExceptions
当 时,该属性会导致将异常返回给发件人。 异常包装在对象中。 在发送方端,有一个可用的 ,如果将其配置为 ,则会重新引发服务器端异常,并包装在 . 服务器异常的堆栈跟踪是通过合并服务器和客户端堆栈跟踪来合成的。returnExceptionstrueRemoteInvocationResultRemoteInvocationAwareMessageConverterAdapterRabbitTemplateAmqpRemoteException
这种机制通常只适用于默认的,它使用Java序列化。 异常通常不是“杰克逊友好”的,不能序列化为 JSON。 如果使用 JSON,请考虑在引发异常时使用 返回其他一些对杰克逊友好的对象。SimpleMessageConvertererrorHandlerError
在 2.1 版中,此接口从软件包移至 。o.s.amqp.rabbit.listenero.s.amqp.rabbit.listener.api
从版本 2.1.7 开始,在消息消息标头中可用;这允许您在使用时确认或处理失败的消息:ChannelAcknowledgeMode.MANUAL
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) { ... message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class) .basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), true); }从版本 2.2.18 开始,如果引发消息转换异常,则将调用错误处理程序,并在参数中使用。 这允许应用程序向调用方发送一些结果,指示收到格式不正确的消息。 以前,此类错误由容器引发和处理。nullmessage
容器管理
为批注创建的容器不会注册到应用程序上下文。 您可以通过调用 bean 来获取所有容器的集合。 然后,可以循环访问此集合,例如,停止或启动所有容器或调用方法 在注册表本身上,这将调用每个容器上的操作。getListenerContainers()RabbitListenerEndpointRegistryLifecycle
您还可以通过使用其 、使用 — 来获取对单个容器的引用 例如,对于由上述代码段创建的容器。idgetListenerContainer(String id)registry.getListenerContainer("multi")
从版本 1.5.2 开始,可以使用 获取已注册容器的值。idgetListenerContainerIds()
从版本 1.5 开始,现在可以为终结点上的容器分配 。 这提供了一种获取对容器子集的引用的机制。 添加属性会导致类型的 Bean 使用组名注册到上下文中。groupRabbitListenergroupCollection<MessageListenerContainer>
@RabbitListener批处理
接收一批消息时,通常由容器执行去批处理,并且一次使用一条消息调用侦听器。 从版本 2.2 开始,您可以将侦听器容器工厂和侦听器配置为在一次调用中接收整个批处理,只需设置工厂的属性,并使方法有效负载参数为 a 或 :batchListenerListCollection
@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setBatchListener(true); return factory;}@RabbitListener(queues = "batch.1")public void listen1(List<Thing> in) { ...}// or@RabbitListener(queues = "batch.2")public void listen2(List<Message<Thing>> in) { ...}将该属性设置为 true 会自动关闭工厂创建的容器中的容器属性(除非是 - 见下文)。实际上,反批处理从容器移动到侦听器适配器,适配器创建传递给侦听器的列表。batchListenerdeBatchingEnabledconsumerBatchEnabledtrue
启用批处理的工厂不能与多方法侦听器一起使用。
同样从 2.2 版开始。一次接收一条批处理消息时,最后一条消息包含设置为 的布尔标头。 可以通过将布尔 last' 参数添加到侦听器方法来获取此标头。 标头映射自 。 此外,填充了每个消息片段中的批处理大小。true@Header(AmqpHeaders.LAST_IN_BATCH)MessageProperties.isLastInBatch()AmqpHeaders.BATCH_SIZE
此外,还向 中添加了一个新属性。 如果为 true,容器将创建一批消息,最多 ;如果经过没有新消息到达,则会传递部分批处理。 如果收到生产者创建的批处理,则会对其进行去批处理并将其添加到使用者端批处理中;因此,实际传送的消息数可能超过 ,这表示从代理接收的消息数。 当为真时必须为真;容器工厂将强制实施此要求。consumerBatchEnabledSimpleMessageListenerContainerbatchSizereceiveTimeoutbatchSizedeBatchingEnabledconsumerBatchEnabled
@Beanpublic SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setConsumerTagStrategy(consumerTagStrategy()); factory.setBatchListener(true); // configures a BatchMessageListenerAdapter factory.setBatchSize(2); factory.setConsumerBatchEnabled(true); return factory;}与 : 一起使用时:consumerBatchEnabled@RabbitListener
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")public void consumerBatch1(List<Message> amqpMessages) { ...}@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) { ...}@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")public void consumerBatch3(List<Invoice> strings) { ...}- 第一个是用收到的原始的、未转换的 s 调用的。org.springframework.amqp.core.Message
- 第二个是使用 S 调用的,其中包含转换的有效负载和映射的标头/属性。org.springframework.messaging.Message<?>
- 第三个是使用转换后的有效负载调用的,无法访问标头/属性。
您还可以添加参数,通常在使用确认模式时使用。 这对于第三个示例不是很有用,因为您无权访问该属性。ChannelMANUALdelivery_tag
Spring 引导为 和 提供了配置属性,但不为 提供配置属性。 从版本 3.0 开始,在容器工厂中设置为 也会设置为 。 当为 时,侦听器必须是批处理侦听器。consumerBatchEnabledbatchSizebatchListenerconsumerBatchEnabledtruebatchListenertrueconsumerBatchEnabledtrue
从版本 3.0 开始,侦听器方法可以使用 或 。Collection<?>List<?>
使用容器工厂
引入了侦听器容器工厂以支持 和向 注册容器,如编程终结点注册中所述。@RabbitListenerRabbitListenerEndpointRegistry
从版本 2.1 开始,它们可用于创建任何侦听器容器 — 甚至是没有侦听器的容器(例如在 Spring 集成中使用)。 当然,在启动容器之前必须添加侦听器。
有两种方法可以创建此类容器:
- 使用 SimpleRabbitListenerEndpoint
- 创建后添加侦听器
以下示例演示如何使用 创建侦听器容器:SimpleRabbitListenerEndpoint
@Beanpublic SimpleMessageListenerContainer factoryCreatedContainerSimpleListener( SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setQueueNames("queue.1"); endpoint.setMessageListener(message -> { ... }); return rabbitListenerContainerFactory.createListenerContainer(endpoint);}以下示例演示如何在创建后添加侦听器:
@Beanpublic SimpleMessageListenerContainer factoryCreatedContainerNoListener( SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) { SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer(); container.setMessageListener(message -> { ... }); container.setQueueNames("test.no.listener.yet"); return container;}无论哪种情况,侦听器也可以是 ,因为它现在是 的子接口。ChannelAwareMessageListenerMessageListener
如果您希望创建多个具有相似属性的容器或使用预配置的容器工厂(例如 Spring 引导自动配置提供的容器工厂)或两者,这些技术非常有用。
以这种方式创建的容器是普通实例,不会在 .@BeanRabbitListenerEndpointRegistry
异步返回类型@RabbitListener
@RabbitListener(和 ) 方法可以使用异步返回类型和 指定,以便异步发送回复。 不再受支持;它已被 Spring Framework 弃用。@RabbitHandlerCompletableFuture<?>Mono<?>ListenableFuture<?>
侦听器容器工厂必须配置为 ,以便使用者线程不会确认消息;相反,异步完成将在异步操作完成时确认或取消消息。 当异步结果完成并出现错误时,消息是否重新排队取决于引发的异常类型、容器配置和容器错误处理程序。 默认情况下,消息将重新排队,除非容器的属性设置为 (默认情况下)。 如果异步结果以 完成,则消息将不会重新排队。 如果容器的属性为 ,则可以通过将将来的异常设置为 a 来覆盖它,消息将重新排队。 如果侦听器方法中发生某些异常,阻止创建异步结果对象,则必须捕获该异常并返回相应的返回对象,该对象将导致消息被确认或重新排队。AcknowledgeMode.MANUALdefaultRequeueRejectedfalsetrueAmqpRejectAndDontRequeueExceptiondefaultRequeueRejectedfalseImmediateRequeueException
从版本 2.2.21、2.3.13、2.4.1 开始,将自动设置检测到异步返回类型时。 此外,具有致命异常的传入消息将单独被否定确认,以前任何先前未确认的消息也会被否定确认。AcknowledgeModeMANUAL
线程和异步使用者
异步使用者涉及许多不同的线程。
中配置的线程用于在 传递新消息时调用 。 如果未配置,则使用 。 如果使用池执行程序,则需要确保池大小足以处理配置的并发。 使用 ,直接在线程上调用 。 在这种情况下,用于监视使用者的任务。TaskExecutorSimpleMessageListenerContainerMessageListenerRabbitMQ ClientSimpleAsyncTaskExecutorDirectMessageListenerContainerMessageListenerRabbitMQ ClienttaskExecutor
使用默认值 时,对于调用侦听器的线程,侦听器容器在 中使用。 这对于日志分析很有用。 通常建议始终在日志记录追加程序配置中包含线程名称。 当 a 通过容器上的属性专门提供时,将按原样使用,无需修改。 建议您使用类似的技术来命名由定制 Bean 定义创建的线程,以帮助在日志消息中进行线程识别。SimpleAsyncTaskExecutorbeanNamethreadNamePrefixTaskExecutortaskExecutorTaskExecutor
中的配置在创建连接时传递到 ,其线程用于将新消息传递到侦听器容器。 如果未配置,客户端将使用内部线程池执行程序,(在撰写本文时)每个连接的池大小为 。ExecutorCachingConnectionFactoryRabbitMQ ClientRuntime.getRuntime().availableProcessors() * 2
如果您有大量工厂或正在使用 ,则可能希望考虑使用具有足够线程的共享来满足工作负载。CacheMode.CONNECTIONThreadPoolTaskExecutor
使用 时,您需要确保为连接工厂配置了一个任务执行程序,该执行程序具有足够的线程来支持使用该工厂的所有侦听器容器之间的所需并发性。 默认池大小(在撰写本文时)为 。DirectMessageListenerContainerRuntime.getRuntime().availableProcessors() * 2
使用 a 为低级别 I/O(套接字)操作创建线程。 要修改此工厂,您需要配置底层 RabbitMQ ,如配置底层客户端连接工厂中所述。RabbitMQ clientThreadFactoryConnectionFactory
选择容器
版本 2.0 引入了 (DMLC)。 以前,只有 (SMLC) 可用。 SMLC 为每个使用者使用内部队列和专用线程。 如果将容器配置为侦听多个队列,则使用相同的使用者线程来处理所有队列。 并发性由 和其他属性控制。 当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给使用者线程。 之所以需要此体系结构,是因为在早期版本的 RabbitMQ 客户端中,无法进行多个并发交付。 较新版本的客户端具有修订的线程模型,现在可以支持并发。 这允许引入DMLC,其中侦听器现在直接在RabbitMQ客户端线程上调用。 因此,它的架构实际上比SMLC“更简单”。 但是,此方法存在一些限制,并且 DMLC 不提供 SMLC 的某些功能。 此外,并发性由 (和客户端库的线程池) 控制。 和关联的属性不适用于此容器。DirectMessageListenerContainerSimpleMessageListenerContainerconcurrentConsumersconsumersPerQueueconcurrentConsumers
以下功能可用于 SMLC,但不适用于 DMLC:
- batchSize:使用 SMLC,您可以设置此项以控制事务中传递的消息数或减少确认数,但这可能会导致失败后重复传递数增加。 (DMLC 确实有 ,您可以使用它来减少确认,与 with 和 SMLC 相同,但它不能用于事务 — 每条消息都在单独的事务中传递和确认)。messagesPerAckbatchSize
- consumerBatchEnabled:在使用者中启用离散消息的批处理;有关详细信息,请参阅消息侦听器容器配置。
- maxConcurrentConsumers和使用者缩放间隔或触发器 — DMLC 中没有自动缩放。 但是,它确实允许您以编程方式更改属性,并相应地调整使用者。consumersPerQueue
但是,DMLC 与 SMLC 相比具有以下优势:
- 在运行时添加和删除队列更有效。 使用 SMLC,将重新启动整个使用者线程(取消并重新创建所有使用者)。 使用 DMLC,不会取消未受影响的消费者。
- 避免了 RabbitMQ 客户端线程和使用者线程之间的上下文切换。
- 线程在使用者之间共享,而不是为 SMLC 中的每个使用者提供专用线程。 但是,请参阅线程和异步使用者中有关连接工厂配置的重要说明。
有关哪些配置属性适用于每个容器的信息,请参阅消息侦听器容器配置。
检测空闲异步使用者
虽然高效,但异步使用者的一个问题是检测它们何时处于空闲状态 — 用户可能希望采用 如果一段时间内没有消息到达,则执行某些操作。
从版本 1.6 开始,现在可以将侦听器容器配置为在一段时间过去而没有消息传递时发布 。 当容器处于空闲状态时,每隔几毫秒发布一个事件。ListenerContainerIdleEventidleEventInterval
若要配置此功能,请在容器上设置。 下面的示例演示如何在 XML 和 Java 中执行此操作(对于 a 和 a):idleEventIntervalSimpleMessageListenerContainerSimpleRabbitListenerContainerFactory
<rabbit:listener-container connection-factory="connectionFactory" ... idle-event-interval="60000" ... > <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" /></rabbit:listener-container>@Beanpublic SimpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); ... container.setIdleEventInterval(60000L); ... return container;}@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setIdleEventInterval(60000L); ... return factory;}在上述每种情况下,在容器空闲时,每分钟发布一次事件。
事件消耗
您可以通过实现来捕获空闲事件 — 一个通用侦听器,或者一个缩小到仅侦听器 接收此特定事件。 你也可以使用 Spring Framework 4.2 中引入的 。ApplicationListener@EventListener
下面的示例将 and 合并到单个类中。 您需要了解应用程序侦听器获取所有容器的事件,因此您可能需要 如果要根据空闲的容器执行特定操作,请检查侦听器 ID。 您也可以将 用于此目的。@RabbitListener@EventListener@EventListenercondition
这些事件有四个属性:
- source:侦听器容器实例
- id:侦听器标识(或容器 Bean 名称)
- idleTime:发布事件时容器处于空闲状态的时间
- queueNames:容器侦听的队列的名称
下面的示例演示如何使用 和 批注创建侦听器:@RabbitListener@EventListener
public class Listener { @RabbitListener(id="someId", queues="#{queue.name}") public String listen(String foo) { return foo.toUpperCase(); } @EventListener(condition = "event.listenerId == 'someId'") public void onApplicationEvent(ListenerContainerIdleEvent event) { ... }}事件侦听器查看所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。
如果您希望使用 idle 事件来停止列表器容器,则不应调用调用侦听器的线程。 这样做总是会导致延迟和不必要的日志消息。 相反,您应该将事件传递给另一个线程,然后该线程可以停止容器。container.stop()
监视侦听器性能
从版本 2.2 开始,如果在类路径上检测到侦听器,并且应用程序上下文中存在单个(或者恰好注释了一个,例如使用 Spring Boot 时),则侦听器容器将自动为侦听器创建和更新千分尺。 通过将容器属性设置为 .TimerMicrometerMeterRegistry@PrimarymicrometerEnabledfalse
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。 使用简单的 ,每个配置的队列都有一对计时器。MessageListener
计时器已命名并具有以下标记:spring.rabbitmq.listener
- listenerId:(侦听器 ID 或容器 Bean 名称)
- queue:(简单侦听器的队列名称或配置的队列名称列表为 - 因为批处理可能包含来自多个队列的消息)consumerBatchEnabledtrue
- result:或successfailure
- exception:或noneListenerExecutionFailedException
您可以使用容器属性添加其他标记。micrometerTags
另请参阅千分尺观察。
千分尺观察
从版本 3.0 开始,现在支持对 和 侦听器容器使用千分尺进行观察。RabbitTemplate
在每个组件上设置以启用观察;这将禁用千分尺计时器,因为现在将针对每个观测值管理计时器。observationEnabled
有关详细信息,请参阅千分尺跟踪。
若要向计时器/跟踪添加标记,请分别配置自定义或模板或侦听器容器。RabbitTemplateObservationConventionRabbitListenerObservationConvention
默认实现为模板观察添加标记,为容器添加标记。bean.namelistener.id
您可以子类或提供全新的实现。DefaultRabbitTemplateObservationConventionDefaultRabbitListenerObservationConvention
有关更多详细信息,请参阅千分尺观察文档。