当前位置 : 主页 > 编程语言 > java >

Spring AMQP项目(三)

来源:互联网 收集:自由互联 发布时间:2023-02-04
4.1.7. 容器和代理命名队列 虽然最好将实例用作自动删除队列,但从版本 2.1 开始,您可以将代理命名队列与侦听器容器一起使用。 以下示例演示如何执行此操作:​​AnonymousQueue​​

Spring AMQP项目(三)_应用程序

4.1.7. 容器和代理命名队列

虽然最好将实例用作自动删除队列,但从版本 2.1 开始,您可以将代理命名队列与侦听器容器一起使用。 以下示例演示如何执行此操作:​​AnonymousQueue​​

@Beanpublic Queue queue() { return new Queue("", false, true, true);}@Beanpublic SimpleMessageListenerContainer container() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf()); container.setQueues(queue()); container.setMessageListener(m -> { ... }); container.setMissingQueuesFatal(false); return container;}

请注意名称的空。 当声明队列时,它会使用代理返回的名称更新属性。 必须在配置容器时使用,以便容器可以在运行时访问声明的名称。 仅设置名称是不够的。​​String​​​​RabbitAdmin​​​​Queue.actualName​​​​setQueues()​​

您无法在容器运行时将代理命名的队列添加到容器中。

重置连接并建立新连接后,新队列将获得新名称。 由于容器重新启动和重新声明队列之间存在争用条件,因此将容器的属性设置为 非常重要,因为容器最初可能会尝试重新连接到旧队列。​​missingQueuesFatal​​​​false​​

4.1.8. 消息转换器

还定义了几种方法,用于发送和接收委托给 . 为每个方向提供了一个方法:一个用于转换为 a,另一个用于从 . 请注意,转换为 时,除了对象之外,还可以提供属性。 该参数通常对应于消息正文。 以下清单显示了接口定义:​​AmqpTemplate​​​​MessageConverter​​​​MessageConverter​​​​Message​​​​Message​​​​Message​​​​object​​​​MessageConverter​​

public interface MessageConverter { Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; Object fromMessage(Message message) throws MessageConversionException;}

上的相关 -send 方法比我们之前讨论的方法更简单,因为它们不需要实例。 相反,负责通过将提供的对象转换为主体的字节数组,然后添加任何提供的 . 下面的清单显示了各种方法的定义:​​Message​​​​AmqpTemplate​​​​Message​​​​MessageConverter​​​​Message​​​​Message​​​​MessageProperties​​

void convertAndSend(Object message) throws AmqpException;void convertAndSend(String routingKey, Object message) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有两种方法:一种接受队列名称,另一种依赖于已设置的模板的“queue”属性。 下面的清单显示了这两种方法的定义:

Object receiveAndConvert() throws AmqpException;Object receiveAndConvert(String queueName) throws AmqpException;

异步使用者​中提到的也使用 .​​MessageListenerAdapter​​​​MessageConverter​​

​​SimpleMessageConverter​​

该策略的默认实现称为 。 这是实例使用的转换器(如果未显式配置替代项)。 它处理基于文本的内容、序列化的 Java 对象和字节数组。​​MessageConverter​​​​SimpleMessageConverter​​​​RabbitTemplate​​

从​​Message​​

如果输入的内容类型以“文本”开头(例如, “text/plain”),它还检查内容编码属性以确定将正文字节数组转换为 Java 时要使用的字符集。 如果没有在输入上设置内容编码属性,则默认情况下使用 UTF-8 字符集。 如果需要覆盖该默认设置,可以配置 的实例,设置其属性,并将其注入到实例中。​​Message​​​​Message​​​​String​​​​Message​​​​SimpleMessageConverter​​​​defaultCharset​​​​RabbitTemplate​​

如果输入的内容类型属性值设置为“application/x-java-serialized-object”,则会尝试将字节数组反序列化(解除冻结)为 Java 对象。 虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。 当然,它也排除了在任何一方使用非Java系统的可能性。 由于AMQP是一种线级协议,因此不幸的是,通过这种限制失去大部分优势。 在接下来的两节中,我们将探讨一些在不依赖 Java 序列化的情况下传递富域对象内容的替代方法。​​Message​​​​SimpleMessageConverter​​

对于所有其他内容类型,将正文内容直接作为字节数组返回。​​SimpleMessageConverter​​​​Message​​

有关重要信息,请参阅 Java 反序列化。

正在转换为​​Message​​

当从任意 Java 对象转换为 a 时,同样会处理字节数组、字符串和可序列化实例。 它将其中每个转换为字节(对于字节数组,没有要转换的内容),并相应地设置内容类型属性。 如果要转换的 与这些类型之一不匹配,则正文为 null。​​Message​​​​SimpleMessageConverter​​​​Object​​​​Message​​

​​SerializerMessageConverter​​

这个转换器与 类似,只是它可以配置其他 Spring 框架和转换实现。​​SimpleMessageConverter​​​​Serializer​​​​Deserializer​​​​application/x-java-serialized-object​​

有关重要信息,请参阅 Java 反序列化。

杰克逊2Json消息转换器

本节介绍如何使用 与 进行转换。 它包含以下部分:​​Jackson2JsonMessageConverter​​​​Message​​

  • 转换为消息
  • 从消息转换
正在转换为​​Message​​

如上一节所述,通常不建议依赖 Java 序列化。 一种相当常见的替代方案,在不同的语言和平台上更加灵活和可移植,是JSON。 (JavaScript 对象表示法)。 可以在任何实例上配置转换器,以覆盖其默认值的使用。 使用 2.x 库。 以下示例配置:​​RabbitTemplate​​​​SimpleMessageConverter​​​​Jackson2JsonMessageConverter​​​​com.fasterxml.jackson​​​​Jackson2JsonMessageConverter​​

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"> <!-- if necessary, override the DefaultClassMapper --> <property name="classMapper" ref="customClassMapper"/> </bean> </property></bean>

如上所示,默认使用 。 类型信息将添加到 (并从中检索)。 如果入站邮件不包含 中的类型信息,但您知道预期的类型,则 可以使用属性配置静态类型,如以下示例所示:​​Jackson2JsonMessageConverter​​​​DefaultClassMapper​​​​MessageProperties​​​​MessageProperties​​​​defaultType​​

<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultClassMapper"> <property name="defaultType" value="thing1.PurchaseOrder"/> </bean> </property></bean>

此外,还可以根据标头中的值提供自定义映射。 以下示例演示如何执行此操作:​​TypeId​​

@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() { Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter(); jsonConverter.setClassMapper(classMapper()); return jsonConverter;}@Beanpublic DefaultClassMapper classMapper() { DefaultClassMapper classMapper = new DefaultClassMapper(); Map<String, Class<?>> idClassMapping = new HashMap<>(); idClassMapping.put("thing1", Thing1.class); idClassMapping.put("thing2", Thing2.class); classMapper.setIdClassMapping(idClassMapping); return classMapper;}

现在,如果发送系统将标头设置为 ,转换器将创建一个对象,依此类推。 有关从非 Spring 应用程序转换消息的完整讨论,请参阅从非 Spring 应用程序接收 JSON 示例应用程序。​​thing1​​​​Thing1​​

从版本 2.4.3 开始,如果 有参数,转换器将不会添加消息属性;这也用于编码。 添加了一个新方法:​​contentEncoding​​​​supportedMediaType​​​​charset​​​​setSupportedMediaType​​

String utf16 = "application/json; charset=utf-16";converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从​​Message​​

入站消息根据发送系统添加到标头的类型信息转换为对象。

从版本 2.4.3 开始,如果没有消息属性,转换器将尝试检测消息属性中的参数并使用该参数。 如果两者都不存在,如果 有参数,它将用于解码,并最终回退到属性。 添加了一个新方法:​​contentEncoding​​​​charset​​​​contentType​​​​supportedMediaType​​​​charset​​​​defaultCharset​​​​setSupportedMediaType​​

String utf16 = "application/json; charset=utf-16";converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

在 1.6 之前的版本中,如果不存在类型信息,转换将失败。 从版本 1.6 开始,如果缺少类型信息,转换器将使用杰克逊默认值(通常是映射)转换 JSON。

此外,从版本 1.6 开始,当您使用注释(在方法上)时,推断的类型信息将添加到 . 这允许转换器转换为目标方法的参数类型。 这仅适用于只有一个参数没有注释或单个参数带有注释的情况。 在分析过程中将忽略类型的参数。​​@RabbitListener​​​​MessageProperties​​​​@Payload​​​​Message​​

默认情况下,推断的类型信息将覆盖创建的入站和相关标头 通过发送系统。 这允许接收系统自动转换为不同的域对象。 仅当参数类型是具体的(不是抽象的或接口)或来自包时,这才适用。 在所有其他情况下,将使用 和相关标头。 在某些情况下,您可能希望覆盖默认行为并始终使用该信息。 例如,假设您有一个接受参数,但消息包含 是 (具体) 的子类。 推断的类型将不正确。 若要处理这种情况,请将属性设置为 的默认值 。 (该属性实际上在转换器上,但在转换器上提供了一个二传手 为方便起见。 如果注入自定义类型映射器,则应改为在映射器上设置属性。​​TypeId​​​​java.util​​​​TypeId​​​​TypeId​​​​@RabbitListener​​​​Thing1​​​​Thing2​​​​Thing1​​​​TypePrecedence​​​​Jackson2JsonMessageConverter​​​​TYPE_ID​​​​INFERRED​​​​DefaultJackson2JavaTypeMapper​​

从 转换时,传入必须符合 JSON 标准(用于检查)。 从版本 2.2 开始,如果没有属性,或者它具有默认值,则假定为 。 若要恢复到以前的行为(返回未转换),请将转换器的属性设置为 。 如果内容类型不受支持,则会发出日志消息 ,并按原样返回 — 作为 . 因此,为了满足使用者端的要求,生产者必须添加消息属性 — 例如,作为或或使用自动设置标头的 。 以下清单显示了许多转换器调用:​​Message​​​​MessageProperties.getContentType()​​​​contentType.contains("json")​​​​application/json​​​​contentType​​​​application/octet-stream​​​​byte[]​​​​assumeSupportedContentType​​​​false​​​​WARN​​​​Could not convert incoming message with content-type […]​​​​message.getBody()​​​​byte[]​​​​Jackson2JsonMessageConverter​​​​contentType​​​​application/json​​​​text/x-json​​​​Jackson2JsonMessageConverter​​

@RabbitListenerpublic void thing1(Thing1 thing1) {...}@RabbitListenerpublic void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}@RabbitListenerpublic void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}@RabbitListenerpublic void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}@RabbitListenerpublic void thing1(Thing1 thing1, String bar) {...}@RabbitListenerpublic void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在前面清单的前四种情况下,转换器尝试转换为类型。 第五个示例无效,因为我们无法确定哪个参数应接收消息有效负载。 在第六个示例中,由于泛型类型为 .​​Thing1​​​​WildcardType​​

但是,您可以创建自定义转换器并使用 message 属性来决定要转换的类型 的 JSON 到。​​targetMethod​​

只有在方法级别声明注释时,才能实现此类型推断。 对于类级别,转换后的类型用于选择要调用的方法。 因此,基础结构提供了消息属性,您可以在自定义中使用该属性 转换器来确定类型。​​@RabbitListener​​​​@RabbitListener​​​​@RabbitHandler​​​​targetObject​​

从版本 1.6.11 开始,因此 () 提供了克服序列化小工具​漏洞的选项。 默认情况下,为了向后兼容,信任所有包 — 也就是说,它用于该选项。​​Jackson2JsonMessageConverter​​​​DefaultJackson2JavaTypeMapper​​​​DefaultClassMapper​​​​trustedPackages​​​​Jackson2JsonMessageConverter​​​​*​​

从版本 2.4.7 开始,转换器可以配置为在反序列化消息正文后 Jackson 返回时返回。 这有助于 以两种方式接收空有效负载:​​Optional.empty()​​​​null​​​​@RabbitListener​​

@RabbitListener(queues = "op.1")void listen(@Payload(required = false) Thing payload) { handleOptional(payload); // payload might be null}@RabbitListener(queues = "op.2")void listen(Optional<Thing> optional) { handleOptional(optional.orElse(this.emptyThing));}

要启用此功能,请设置为 ;当(默认值)时,转换器将回退到原始消息正文 ()。​​setNullAsOptionalEmpty​​​​true​​​​false​​​​byte[]​​

@BeanJackson2JsonMessageConverter converter() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); converter.setNullAsOptionalEmpty(true); return converter;}
反序列化抽象类

在版本 2.2.8 之前,如果推断的 a 类型是抽象类(包括接口),转换器将回退到在标头中查找类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。 当使用配置了自定义反序列化程序来处理抽象类的自定义,但传入消息具有无效的类型标头时,这会导致问题。​​@RabbitListener​​​​ObjectMapper​​

从版本 2.2.8 开始,默认情况下会保留以前的行为。如果您有这样的自定义,并且想要忽略类型标头,并始终使用推断的类型进行转换,请将 设置为 。 这是向后兼容性所必需的,并避免尝试转换失败时的开销(使用标准)。​​ObjectMapper​​​​alwaysConvertToInferredType​​​​true​​​​ObjectMapper​​

使用 Spring 数据投影接口

从版本 2.2 开始,您可以将 JSON 转换为 Spring 数据投影接口而不是具体类型。 这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息负载类型:

interface SomeSample { @JsonPath({ "$.username", "$.user.name" }) String getUsername();}@RabbitListener(queues = "projection")public void projection(SomeSample in) { String username = in.getUsername(); ...}

默认情况下,访问器方法将用于在收到的 JSON 文档中将属性名称作为字段查找。 该表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以从多个位置查找值,直到表达式返回实际值。​​@JsonPath​​

要启用此功能,请在消息转换器上设置 to。 还必须将 添加到类路径中。​​useProjectionForInterfaces​​​​true​​​​spring-data:spring-data-commons​​​​com.jayway.jsonpath:json-path​​

当用作方法的参数时,接口类型会像往常一样自动传递给转换器。​​@RabbitListener​​

从​​Message​​​​RabbitTemplate​​

如前所述,类型信息在消息头中传达,以帮助转换器从消息转换。 这在大多数情况下工作正常。 但是,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。 从版本 2.0 开始,实现 ,这允许它与接受参数的新方法一起使用。 这允许转换复杂的泛型类型,如以下示例所示:​​Jackson2JsonMessageConverter​​​​SmartMessageConverter​​​​RabbitTemplate​​​​ParameterizedTypeReference​​

Thing1<Thing2<Cat, Hat>> thing1 = rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });

从版本 2.1 开始,该类已被删除。 它不再是 的基类。 它已被替换为 。​​AbstractJsonMessageConverter​​​​Jackson2JsonMessageConverter​​​​AbstractJackson2MessageConverter​​

​​MarshallingMessageConverter​​

另一种选择是 . 它委托给 Spring OXM 库的 and 策略接口实现。 您可以​​在此处​​阅读有关该库的更多信息。 在配置方面,最常见的是只提供构造函数参数,因为大多数实现也实现. 以下示例演示如何配置:​​MarshallingMessageConverter​​​​Marshaller​​​​Unmarshaller​​​​Marshaller​​​​Unmarshaller​​​​MarshallingMessageConverter​​

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter"> <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/> </bean> </property></bean>
​​Jackson2XmlMessageConverter​​

此类是在 2.1 版中引入的,可用于在 XML 之间转换消息。

两者具有相同的基类:。​​Jackson2XmlMessageConverter​​​​Jackson2JsonMessageConverter​​​​AbstractJackson2MessageConverter​​

引入该类是为了替换已删除的类:。​​AbstractJackson2MessageConverter​​​​AbstractJsonMessageConverter​​

使用 2.x 库。​​Jackson2XmlMessageConverter​​​​com.fasterxml.jackson​​

您可以像 一样使用它,只是它支持 XML 而不是 JSON。 以下示例配置:​​Jackson2JsonMessageConverter​​​​Jackson2JsonMessageConverter​​

<bean id="xmlConverterWithDefaultType" class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultClassMapper"> <property name="defaultType" value="foo.PurchaseOrder"/> </bean> </property></bean>

请参阅 Jackson2JsonMessageConverter 了解更多信息。

从版本 2.2 开始,如果没有属性,或者它具有默认值,则假定为 。 若要恢复到以前的行为(返回未转换),请将转换器的属性设置为 。​​application/xml​​​​contentType​​​​application/octet-stream​​​​byte[]​​​​assumeSupportedContentType​​​​false​​

​​ContentTypeDelegatingMessageConverter​​

此类是在 1.4.2 版中引入的,允许根据 中的内容类型属性委派给特定的 。 默认情况下,如果没有属性或值与任何配置的转换器都不匹配,则它委托给 。 以下示例配置:​​MessageConverter​​​​MessageProperties​​​​SimpleMessageConverter​​​​contentType​​​​ContentTypeDelegatingMessageConverter​​

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter"> <property name="delegates"> <map> <entry key="application/json" value-ref="jsonMessageConverter" /> <entry key="application/xml" value-ref="xmlMessageConverter" /> </map> </property></bean>
Java 反序列化

本节介绍如何反序列化 Java 对象。

从不受信任的源反序列化 Java 对象时可能存在一个漏洞。

如果接受来自不受信任源的邮件,其中 为 ,则应 请考虑配置允许反序列化的包和类。 这适用于 和 配置为隐式或通过配置时使用。​​content-type​​​​application/x-java-serialized-object​​​​SimpleMessageConverter​​​​SerializerMessageConverter​​​​DefaultDeserializer​​

默认情况下,允许列表为空,这意味着所有类都已反序列化。

您可以设置模式列表,例如 、 ​​thing1.thing2.Cat​​ 或 。​​thing1.​​​​​.MySafeClass​​

按顺序检查模式,直到找到匹配项。 如果没有匹配项,则抛出 a。​​SecurityException​​

您可以使用这些转换器上的属性设置模式。​​allowedListPatterns​​

消息属性转换器

策略接口用于在兔子客户端和弹簧AMQP之间进行转换。 默认实现 () 通常足以满足大多数目的,但如果需要,您可以实现自己的实现。 默认属性转换器在大小不大于字节时将类型的元素转换为实例。 较大的实例不会转换(请参阅下一段)。 可以使用构造函数参数覆盖此限制。​​MessagePropertiesConverter​​​​BasicProperties​​​​MessageProperties​​​​DefaultMessagePropertiesConverter​​​​BasicProperties​​​​LongString​​​​String​​​​1024​​​​LongString​​

从版本 1.6 开始,长度超过长字符串限制(默认值:1024)的标头现在默认保留为实例。 可以通过 、 或 方法访问内容。​​LongString​​​​DefaultMessagePropertiesConverter​​​​getBytes[]​​​​toString()​​​​getStream()​​

以前,将此类标头“转换”为(实际上它只是引用了实例的)。 在输出时,此标头未转换(除非转换为字符串 - 例如,通过调用流)。​​DefaultMessagePropertiesConverter​​​​DataInputStream​​​​LongString​​​​DataInputStream​​​​java.io.DataInputStream@1d057a39​​​​toString()​​

大型传入标头现在也会在输出时正确“转换”(默认情况下)。​​LongString​​

提供了一个新的构造函数,允许您将转换器配置为像以前一样工作。 下面的清单显示了该方法的 Javadoc 注释和声明:

/** * Construct an instance where LongStrings will be returned * unconverted or as a java.io.DataInputStream when longer than this limit. * Use this constructor with 'true' to restore pre-1.6 behavior. * @param longStringLimit the limit. * @param convertLongLongStrings LongString when false, * DataInputStream when true. * @since 1.6 */public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

同样从版本 1.6 开始,一个名为的新属性已添加到 。 以前,在与 RabbitMQ 客户端使用的转换之间转换时,会执行不必要的转换,因为 是 ,但使用 . (最终,RabbitMQ 客户端使用 UTF-8 将字节转换为要放入协议消息中的字节)。​​correlationIdString​​​​MessageProperties​​​​BasicProperties​​​​byte[] <→ String​​​​MessageProperties.correlationId​​​​byte[]​​​​BasicProperties​​​​String​​​​String​​

为了提供最大的向后兼容性,已将一个名为的新属性添加到 . 这需要一个枚举参数。 默认情况下,它设置为 ,这将复制以前的行为。​​correlationIdPolicy​​​​DefaultMessagePropertiesConverter​​​​DefaultMessagePropertiesConverter.CorrelationIdPolicy​​​​BYTES​​

对于入站邮件:

  • ​​STRING​​:仅映射属性correlationIdString
  • ​​BYTES​​:仅映射属性correlationId
  • ​​BOTH​​:两个属性都映射

对于出站邮件:

  • ​​STRING​​:仅映射属性correlationIdString
  • ​​BYTES​​:仅映射属性correlationId
  • ​​BOTH​​:考虑这两个属性,属性优先String

同样从版本 1.6 开始,入站属性不再映射到 。 而是映射到。 此外,入站属性不再映射到 。 而是映射到。 这些更改是为了避免在出站消息使用相同的对象时意外传播这些属性。​​deliveryMode​​​​MessageProperties.deliveryMode​​​​MessageProperties.receivedDeliveryMode​​​​userId​​​​MessageProperties.userId​​​​MessageProperties.receivedUserId​​​​MessageProperties​​

从版本 2.2 开始,转换任何类型为 using 而不是 ;这避免了使用应用程序必须从表示中解析类名。 对于滚动升级,您可能需要更改使用者以了解这两种格式,直到升级所有生产者为止。​​DefaultMessagePropertiesConverter​​​​Class<?>​​​​getName()​​​​toString()​​​​toString()​​

4.1.9. 修改消息 - 压缩等

存在许多扩展点。 它们允许您在消息发送到 RabbitMQ 之前或接收消息后立即对消息执行一些处理。

在消息转换器中可以看到,一个这样的扩展点是在操作中,您可以在其中提供 . 例如,转换 POJO 后,允许您在 上设置自定义标头或属性。​​AmqpTemplate​​​​convertAndReceive​​​​MessagePostProcessor​​​​MessagePostProcessor​​​​Message​​

从版本 1.4.2 开始,已将其他扩展点添加到 - 和 中。 第一个使后处理器能够在发送到 RabbitMQ 之前立即运行。 使用批处理(请参阅批处理)时,将在组装批处理之后和发送批处理之前调用此值。 第二个在收到消息后立即调用。​​RabbitTemplate​​​​setBeforePublishPostProcessors()​​​​setAfterReceivePostProcessors()​​

这些扩展点用于压缩等功能,为此,提供了几种实现。,并在发送前压缩消息,以及 ,并解压缩收到的消息。​​MessagePostProcessor​​​​GZipPostProcessor​​​​ZipPostProcessor​​​​DeflaterPostProcessor​​​​GUnzipPostProcessor​​​​UnzipPostProcessor​​​​InflaterPostProcessor​​

从版本 2.1.5 开始,可以使用创建原始消息属性副本的选项进行配置。 默认情况下,出于性能原因,这些属性是重用的,并使用压缩内容编码和可选标头进行修改。 如果保留对原始出站消息的引用,则其属性也将更改。 因此,如果您的应用程序使用这些消息后处理器保留出站消息的副本,请考虑启用该选项。​​GZipPostProcessor​​​​copyProperties = true​​​​MessageProperties.SPRING_AUTO_DECOMPRESS​​​​copyProperties​​

从版本 2.2.12 开始,您可以配置压缩后处理器在内容编码元素之间使用的分隔符。 在版本 2.2.11 及更早版本中,这被硬编码为 ,现在在压缩器上将其设置为属性。 当您的使用者升级到 2.2.11 或更高版本时,您可以恢复为默认值 ', '。​​:​​​​, ` by default. The decompressors will work with both delimiters. However, if you publish messages with 2.3 or later and consume with 2.2.11 or earlier, you MUST set the `encodingDelimiter​​​​:​​

同样,也有一种方法,允许在容器收到消息后执行解压缩。​​SimpleMessageListenerContainer​​​​setAfterReceivePostProcessors()​​

从版本 2.1.4 开始,并已添加到 中,以允许将新的后处理器分别附加到发布前和接收后处理器的列表中。 还提供了删除后处理器的方法。 同样,还添加了 和 方法。 有关更多详细信息,请参阅 Javadoc 和。​​addBeforePublishPostProcessors()​​​​addAfterReceivePostProcessors()​​​​RabbitTemplate​​​​AbstractMessageListenerContainer​​​​addAfterReceivePostProcessors()​​​​removeAfterReceivePostProcessor()​​​​RabbitTemplate​​​​AbstractMessageListenerContainer​​

4.1.10. 请求/回复消息

还提供了多种方法,这些方法接受前面为单向发送操作(、和)描述的相同参数选项。 这些方法对于请求-答复方案非常有用,因为它们在发送之前处理必要属性的配置,并且可以侦听内部为此目的创建的独占队列上的回复消息。​​AmqpTemplate​​​​sendAndReceive​​​​exchange​​​​routingKey​​​​Message​​​​reply-to​​

类似的请求-答复方法也可用于同时应用于请求和答复。 这些方法被命名为 。 有关更多详细信息,请参阅 AmqpTemplate 的 Javadoc。​​MessageConverter​​​​convertSendAndReceive​​

从版本 1.5.0 开始,每个方法变体都有一个重载版本,该版本采用 . 与正确配置的连接工厂一起,这将允许接收操作的发送端的发布者确认。 有关详细信息,请参阅相关发布者确认和返回和 RabbitOperations 的 Javadoc。​​sendAndReceive​​​​CorrelationData​​

从版本 2.0 开始,这些方法 () 的变体采用额外的参数来转换复杂的返回类型。 模板必须配置 . 有关详细信息,请参阅使用 RabbitTemplate 从消息转换。​​convertSendAndReceiveAsType​​​​ParameterizedTypeReference​​​​SmartMessageConverter​​

从版本 2.1 开始,您可以配置 带有 的选项来控制回复使用者的标志。 这是默认的。​​RabbitTemplate​​​​noLocalReplyConsumer​​​​noLocal​​​​false​​

回复超时

默认情况下,发送和接收方法在五秒后超时并返回 null。 可以通过设置属性来修改此行为。 从版本 1.5 开始,如果将属性设置为 (或将特定消息的评估设置为 ),如果消息无法传递到队列,则会引发 。 此异常具有 、 和属性,以及用于发送的 和。​​replyTimeout​​​​mandatory​​​​true​​​​mandatory-expression​​​​true​​​​AmqpMessageReturnedException​​​​returnedMessage​​​​replyCode​​​​replyText​​​​exchange​​​​routingKey​​

此功能使用发布商退货。 您可以通过将 设置为 (请参阅发布者确认和返回​)来启用它。 此外,您不得在 .​​publisherReturns​​​​true​​​​CachingConnectionFactory​​​​ReturnCallback​​​​RabbitTemplate​​

从版本 2.1.2 开始,添加了一个方法,让子类被告知超时,以便它们可以清理任何保留的状态。​​replyTimedOut​​

从版本 2.0.11 和 2.1.3 开始,当您使用默认值时,可以通过设置模板的属性来添加错误处理程序。 对于任何失败的传递,例如延迟回复和收到的没有相关标头的消息,将调用此错误处理程序。 传入的异常是 ,它有一个属性。​​DirectReplyToMessageListenerContainer​​​​replyErrorHandler​​​​ListenerExecutionFailedException​​​​failedMessage​​

兔子MQ 直接回复

从 3.4.0 版本开始,RabbitMQ 服务器支持直接回复​。 这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。 从Spring AMQP版本1.4.1开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。 如果提供 no(或设置名称为 ),则 会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。 使用直接回复时,a 不是必需的,也不应配置。​​replyQueue​​​​amq.rabbitmq.reply-to​​​​RabbitTemplate​​​​reply-listener​​

命名队列(除了 ),仍支持回复侦听器,允许控制回复并发等。​​amq.rabbitmq.reply-to​​

从版本 1.6 开始,如果您希望为每个使用临时的、独占的、自动删除的队列 回复,将属性设置为 。 如果设置 .​​useTemporaryReplyQueues​​​​true​​​​replyAddress​​

您可以通过子类化和覆盖来更改指示是否使用直接回复的条件,以检查不同的条件。 该方法仅在发送第一个请求时调用一次。​​RabbitTemplate​​​​useDirectReplyTo()​​

在版本 2.0 之前,为每个请求创建一个新使用者,并在收到回复(或超时)时取消使用者。 现在,模板使用 代替,让使用者被重用。 该模板仍然负责关联回复,因此不存在延迟回复发送给其他发件人的危险。 如果要还原到以前的行为,请将 ( 使用 XML 配置时) 属性设置为 false。​​RabbitTemplate​​​​DirectReplyToMessageListenerContainer​​​​useDirectReplyToContainer​​​​direct-reply-to-container​​

没有这样的选择。 当使用直接回复时,它总是使用 for 回复。​​AsyncRabbitTemplate​​​​DirectReplyToContainer​​

从版本 2.3.7 开始,模板具有一个新属性。 如果为 ,服务器不必将相关 ID 从请求消息标头复制到回复消息。 相反,用于发送请求的通道用于关联对请求的回复。​​useChannelForCorrelation​​​​true​​

消息与回复队列的关联

使用固定回复队列(而不是 )时,必须提供关联数据,以便回复可以与请求相关联。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,标准属性用于保存相关数据。 但是,如果您希望使用自定义属性来保存相关数据,则可以在 <rabbit-template/> 上设置该属性。 显式将属性设置为 与省略属性相同。 客户端和服务器必须对相关数据使用相同的标头。​​amq.rabbitmq.reply-to​​​​correlationId​​​​correlation-key​​​​correlationId​​

Spring AMQP 版本 1.1 使用了为此数据调用的自定义属性。 如果要在当前版本中恢复到此行为(可能是为了保持与使用 1.1 的其他应用程序的兼容性),则必须将该属性设置为 。​​spring_reply_correlation​​​​spring_reply_correlation​​

默认情况下,模板生成自己的相关 ID(忽略用户提供的任何值)。 如果要使用自己的相关 ID,请将实例的属性设置为 。​​RabbitTemplate​​​​userCorrelationId​​​​true​​

相关 ID 必须是唯一的,以避免为请求返回错误回复的可能性。

回复侦听器容器

使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。 但是,可以在模板上配置单个回复队列,这可以更有效,并且还允许您在该队列上设置参数。 但是,在这种情况下,还必须提供一个 <回复侦听器/> 子元素。 此元素为回复队列提供侦听器容器,模板是侦听器。 元素上允许在 <侦听器容器/> 上使用所有消息侦听器容器配置属性,但 和 除外,这些属性继承自模板的配置。​​connection-factory​​​​message-converter​​

如果运行应用程序的多个实例或使用多个实例,则必须为每个实例使用唯一的回复队列。 RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会竞争回复,而不一定会收到自己的回复。​​RabbitTemplate​​

以下示例定义了一个具有连接工厂的兔子模板:

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-queue="replies" reply-address="replyEx/routeReply"> <rabbit:reply-listener/></rabbit:template>

虽然容器和模板共享连接工厂,但它们不共享通道。 因此,请求和答复不会在同一事务中执行(如果是事务)。

在版本 1.5.0 之前,该属性不可用。 始终使用默认交换和名称作为路由密钥来路由答复。 这仍然是默认值,但您现在可以指定新属性。 可以包含带有表单的地址,回复将路由到指定的交换并路由到与路由密钥绑定的队列。 的优先级高于 。 当仅使用时,必须将 配置为单独的组件。 和(或 上的属性)必须在逻辑上引用同一队列。​​reply-address​​​​reply-queue​​​​reply-address​​​​reply-address​​​​<exchange>/<routingKey>​​​​reply-address​​​​reply-queue​​​​reply-address​​​​<reply-listener>​​​​<listener-container>​​​​reply-address​​​​reply-queue​​​​queues​​​​<listener-container>​​

使用此配置时,a 用于接收回复,其中 是 . 使用命名空间元素定义模板时(如前面的示例所示),解析器将模板中的容器和连线定义为侦听器。​​SimpleListenerContainer​​​​RabbitTemplate​​​​MessageListener​​​​<rabbit:template/>​​

当模板不使用固定的(或者使用直接回复 — 参见 RabbitMQ 直接回复​)时,不需要侦听器容器。 直接是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。​​replyQueue​​​​reply-to​​

如果在以编程方式创建模板时将 u 定义为 a 或使用类将其定义为 or 时,则需要自行定义并连接回复侦听器容器。 如果不这样做,模板将永远不会收到回复,并最终超时并返回 null 作为对方法调用的回复。​​RabbitTemplate​​​​<bean/>​​​​@Configuration​​​​@Bean​​​​sendAndReceive​​

从版本 1.5 开始,检测它是否已 配置为 以接收答复。 如果没有,则尝试使用回复地址发送和接收邮件 失败并显示 (因为从未收到回复)。​​RabbitTemplate​​​​MessageListener​​​​IllegalStateException​​

此外,如果使用简单(队列名称),则回复侦听器容器会验证它是否正在侦听 到具有相同名称的队列。 如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。​​replyAddress​​

自行连接回复侦听器和模板时,请务必确保模板和容器的(或)属性引用同一队列。 模板将答复地址插入到出站邮件属性中。​​replyAddress​​​​queues​​​​queueNames​​​​replyTo​​

下面的清单显示了如何手动连接 Bean 的示例:

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactory" /> <property name="exchange" value="foo.exchange" /> <property name="routingKey" value="foo" /> <property name="replyQueue" ref="replyQ" /> <property name="replyTimeout" value="600000" /> <property name="useDirectReplyToContainer" value="false" /></bean><bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <constructor-arg ref="connectionFactory" /> <property name="queues" ref="replyQ" /> <property name="messageListener" ref="amqpTemplate" /></bean><rabbit:queue id="replyQ" name="my.reply.queue" />@Bean public RabbitTemplate amqpTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMessageConverter(msgConv()); rabbitTemplate.setReplyAddress(replyQueue().getName()); rabbitTemplate.setReplyTimeout(60000); rabbitTemplate.setUseDirectReplyToContainer(false); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer replyListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(amqpTemplate()); return container; } @Bean public Queue replyQueue() { return new Queue("my.reply.queue"); }

此测试用例中显示了具有固定回复队列的连线的完整示例,以及处理请求并返回回复的“远程”侦听器容器。​​RabbitTemplate​​

当回复超时 () 时,方法返回 null。​​replyTimeout​​​​sendAndReceive()​​

在版本 1.3.6 之前,仅记录超时消息的延迟回复。 现在,如果收到延迟回复,则会被拒绝(模板会抛出 )。 如果回复队列配置为将拒绝的消息发送到死信交换,则可以检索回复以供以后分析。 为此,请使用与回复队列名称相等的路由密钥将队列绑定到配置的死信交换。​​AmqpRejectAndDontRequeueException​​

有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。 您还可以查看测试用例以获取示例。​​FixedReplyQueueDeadLetterTests​​

异步兔子模板

版本 1.6 引入了 . 这与 ​​AmqpTemplate​​ 上的方法类似(和)。 但是,它们不会阻止,而是返回 .​​AsyncRabbitTemplate​​​​sendAndReceive​​​​convertSendAndReceive​​​​CompletableFuture​​

这些方法返回 . 这些方法返回 .​​sendAndReceive​​​​RabbitMessageFuture​​​​convertSendAndReceive​​​​RabbitConverterFuture​​

您可以稍后通过调用将来来同步检索结果,也可以注册与结果异步调用的回调。 下面的清单显示了这两种方法:​​get()​​

@Autowiredprivate AsyncRabbitTemplate template;...public void doSomeWorkAndGetResultLater() { ... CompletableFuture<String> future = this.template.convertSendAndReceive("foo"); // do some more work String reply = null; try { reply = future.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { ... } ...}public void doSomeWorkAndGetResultAsync() { ... RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo"); future.whenComplete((result, ex) -> { if (ex == null) { // success } else { // failure } }); ...}

如果 设置了 并且无法传递消息,则 future 会抛出一个原因为 ,这将封装返回的消息和有关返回的信息。​​mandatory​​​​ExecutionException​​​​AmqpMessageReturnedException​​

如果 设置为 ,则 future 具有一个名为 的属性,该属性本身为 ,表示发布成功。 如果确认未来为 ,则 还有一个名为 的属性,其中包含失败的原因(如果可用)。​​enableConfirms​​​​confirm​​​​CompletableFuture<Boolean>​​​​true​​​​false​​​​RabbitFuture​​​​nackCause​​

如果在回复后收到发布者确认,则会丢弃该确认,因为回复意味着发布成功。

可以将模板上的属性设置为使答复超时(默认为 - 30 秒)。 如果发生超时,则未来以 .​​receiveTimeout​​​​30000​​​​AmqpReplyTimeoutException​​

模板实现 . 在有挂起的答复时停止模板会导致挂起的实例被取消。​​SmartLifecycle​​​​Future​​

从版本 2.0 开始,异步模板现在支持直接回复,而不是配置的回复队列。 若要启用此功能,请使用以下构造函数之一:

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)public AsyncRabbitTemplate(RabbitTemplate template)

请参阅 RabbitMQ 直接回复 以将直接回复与同步 一起使用。​​RabbitTemplate​​

版本 2.0 引入了这些方法 () 的变体,这些变体采用额外的参数来转换复杂的返回类型。 必须使用. 有关详细信息,请参阅使用 RabbitTemplate 从消息转换。​​convertSendAndReceiveAsType​​​​ParameterizedTypeReference​​​​RabbitTemplate​​​​SmartMessageConverter​​

从版本 3.0 开始,这些方法现在返回 s 而不是 s。​​AsyncRabbitTemplate​​​​CompletableFuture​​​​ListenableFuture​​

使用 AMQP 进行春季远程处理

Spring 远程处理不再受支持,因为该功能已从 Spring Framework 中删除。

改用 (客户端) 和 的操作。​​sendAndReceive​​​​RabbitTemplate​​​​@RabbitListener​​

4.1.11. 配置代理

AMQP 规范描述了如何使用协议在代理上配置队列、交换和绑定。 这些操作(可从 0.8 及更高规范移植)存在于包的接口中。 该类的 RabbitMQ 实现位于包中。​​AmqpAdmin​​​​org.springframework.amqp.core​​​​RabbitAdmin​​​​org.springframework.amqp.rabbit.core​​

该接口基于使用 Spring AMQP 域抽象,如以下清单所示:​​AmqpAdmin​​

public interface AmqpAdmin { // Exchange Operations void declareExchange(Exchange exchange); void deleteExchange(String exchangeName); // Queue Operations Queue declareQueue(); String declareQueue(Queue queue); void deleteQueue(String queueName); void deleteQueue(String queueName, boolean unused, boolean empty); void purgeQueue(String queueName, boolean noWait); // Binding Operations void declareBinding(Binding binding); void removeBinding(Binding binding); Properties getQueueProperties(String queueName);}

另请参阅作用域内操作。

该方法返回有关队列的一些有限信息(消息计数和使用者计数)。 返回的属性的键在 (、 和 ) 中用作常量。 RabbitMQ REST API 在对象中提供了更多信息。​​getQueueProperties()​​​​RabbitTemplate​​​​QUEUE_NAME​​​​QUEUE_MESSAGE_COUNT​​​​QUEUE_CONSUMER_COUNT​​​​QueueInfo​​

no-arg 方法使用自动生成的名称定义代理上的队列。 此自动生成的队列的其他属性包括 、 和 。​​declareQueue()​​​​exclusive=true​​​​autoDelete=true​​​​durable=false​​

该方法采用一个对象并返回已声明队列的名称。 如果提供的属性为空,则代理使用生成的名称声明队列。 该名称将返回给调用方。 该名称也会添加到 的属性中。 只能通过直接调用 以编程方式使用此功能。 在应用程序上下文中以声明方式定义队列时,管理员使用自动声明时,可以将 name 属性设置为 (空字符串)。 然后,代理创建名称。 从版本 2.1 开始,侦听器容器可以使用此类型的队列。 有关详细信息,请参阅容器和代理命名队列。​​declareQueue(Queue queue)​​​​Queue​​​​name​​​​Queue​​​​String​​​​actualName​​​​Queue​​​​RabbitAdmin​​​​""​​

这与框架生成唯一 () 名称并设置为 和 、 相反。 具有空(或缺失)属性的 始终会创建一个 .​​AnonymousQueue​​​​UUID​​​​durable​​​​false​​​​exclusive​​​​autoDelete​​​​true​​​​<rabbit:queue/>​​​​name​​​​AnonymousQueue​​

请参阅 AnonymousQueue 以了解为什么优先于代理生成的队列名称以及 如何控制名称的格式。 从版本 2.1 开始,默认情况下,匿名队列的参数设置为 。 这可确保在应用程序连接到的节点上声明队列。 声明性队列必须具有固定的名称,因为它们可能在上下文中的其他位置被引用,例如在 以下示例中显示的侦听器:​​AnonymousQueue​​​​Queue.X_QUEUE_LEADER_LOCATOR​​​​client-local​​

<rabbit:listener-container> <rabbit:listener ref="listener" queue-names="#{someQueue.name}" /></rabbit:listener-container>

请参阅交换、队列和绑定的自动声明。

此接口的 RabbitMQ 实现是 ,当使用 Spring XML 进行配置时,类似于以下示例:​​RabbitAdmin​​

<rabbit:connection-factory id="connectionFactory"/><rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

当缓存模式为(默认)时,实现会自动延迟声明在同一 中声明的队列、交换和绑定。 这些组件在向代理打开 后立即声明。 有一些命名空间功能使这非常方便 - 例如, 在股票示例应用程序中,我们有以下内容:​​CachingConnectionFactory​​​​CHANNEL​​​​RabbitAdmin​​​​ApplicationContext​​​​Connection​​

<rabbit:queue id="tradeQueue"/><rabbit:queue id="marketDataQueue"/><fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit"> <bindings> <binding queue="tradeQueue"/> </bindings></fanout-exchange><topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit"> <bindings> <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/> </bindings></topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是由框架生成的名称的队列,而不是由代理生成的名称),并通过 ID 引用它们。 我们还可以使用显式名称声明队列,这些队列也用作上下文中其 bean 定义的标识符。 以下示例使用显式名称配置队列:

<rabbit:queue name="stocks.trade.queue"/>

您可以同时提供 和 属性。 这允许您通过独立于队列名称的 ID 引用队列(例如,在绑定中)。 它还允许使用标准的 Spring 功能(例如队列名称的属性占位符和 SpEL 表达式)。 当您使用名称作为 Bean 标识符时,这些功能不可用。​​id​​​​name​​

可以使用其他参数配置队列,例如 . 使用命名空间支持时,它们以参数名称/参数值对的形式提供,这些参数-名称/参数-值对是使用元素定义的。 以下示例演示如何执行此操作:​​x-message-ttl​​​​Map​​​​<rabbit:queue-arguments>​​

<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="myDLX"/> <entry key="x-dead-letter-routing-key" value="dlqRK"/> </rabbit:queue-arguments></rabbit:queue>

默认情况下,假定参数为字符串。 对于其他类型的参数,必须提供类型。 下面的示例演示如何指定类型:

<rabbit:queue name="withArguments"> <rabbit:queue-arguments value-type="java.lang.Long"> <entry key="x-message-ttl" value="100"/> </rabbit:queue-arguments></rabbit:queue>

提供混合类型的参数时,必须为每个条目元素提供类型。 以下示例演示如何执行此操作:

<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">100</value> </entry> <entry key="x-dead-letter-exchange" value="myDLX"/> <entry key="x-dead-letter-routing-key" value="dlqRK"/> </rabbit:queue-arguments></rabbit:queue>

在 Spring Framework 3.2 及更高版本中,可以更简洁地声明这一点,如下所示:

<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/> <entry key="x-ha-policy" value="all"/> </rabbit:queue-arguments></rabbit:queue>

使用 Java 配置时,通过类上的方法支持将参数作为第一类属性。 从版本 2.1 开始,匿名队列的声明将此属性设置为 默认情况下。 这可确保在应用程序连接到的节点上声明队列。​​Queue.X_QUEUE_LEADER_LOCATOR​​​​setLeaderLocator()​​​​Queue​​​​client-local​​

RabbitMQ 代理不允许声明参数不匹配的队列。 例如,如果 已经存在且没有参数,并且您尝试使用 (例如) 声明它,则会引发异常。​​queue​​​​time to live​​​​key="x-message-ttl" value="100"​​

默认情况下,当发生任何异常时,会立即停止处理所有声明。 这可能会导致下游问题,例如侦听器容器无法初始化,因为未声明另一个队列(在错误队列之后定义)。​​RabbitAdmin​​

可以通过将属性设置为在实例上来修改此行为。 此选项指示 记录异常并继续声明其他元素。 使用 Java 进行配置时,此属性称为 。 这是适用于所有元素的全局设置。 队列、交换和绑定具有仅适用于这些元素的类似属性。​​ignore-declaration-exceptions​​​​true​​​​RabbitAdmin​​​​RabbitAdmin​​​​RabbitAdmin​​​​ignoreDeclarationExceptions​​

在版本 1.6 之前,此属性仅在通道上发生 时生效,例如当前属性与所需属性不匹配时。 现在,此属性对任何异常(包括和其他异常)生效。​​IOException​​​​TimeoutException​​

此外,任何声明异常都会导致发布 ,该 是上下文中的任何人都可以使用的 。 该事件包含对 admin、正在声明的元素和 .​​DeclarationExceptionEvent​​​​ApplicationEvent​​​​ApplicationListener​​​​Throwable​​

标头交换

从版本 1.3 开始,您可以将 配置为在多个标头上匹配。 您还可以指定任何或所有标头是否必须匹配。 以下示例演示如何执行此操作:​​HeadersExchange​​

<rabbit:headers-exchange name="headers-test"> <rabbit:bindings> <rabbit:binding queue="bucket"> <rabbit:binding-arguments> <entry key="foo" value="bar"/> <entry key="baz" value="qux"/> <entry key="x-match" value="all"/> </rabbit:binding-arguments> </rabbit:binding> </rabbit:bindings></rabbit:headers-exchange>

从版本 1.6 开始,您可以使用标志(缺省值为 )进行配置,并且通过 (如果应用程序上下文中存在一个标志)在代理上正确配置了此类标志。 如果该标志用于交换,则 RabbitMQ 不允许客户端使用该交换。 这对于死信交换或交换到交换绑定很有用,您不希望使用交换 直接由出版商。​​Exchanges​​​​internal​​​​false​​​​Exchange​​​​RabbitAdmin​​​​internal​​​​true​​

若要了解如何使用 Java 配置 AMQP 基础结构,请查看 Stock 示例应用程序, 哪里有类,又有和子类。 下面的清单显示了以下代码:​​@Configuration​​​​AbstractStockRabbitConfiguration​​​​RabbitClientConfiguration​​​​RabbitServerConfiguration​​​​AbstractStockRabbitConfiguration​​

@Configurationpublic abstract class AbstractStockAppRabbitConfiguration { @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMessageConverter(jsonMessageConverter()); configureRabbitTemplate(template); return template; } @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public TopicExchange marketDataExchange() { return new TopicExchange("app.stock.marketdata"); } // additional code omitted for brevity}

在 Stock 应用程序中,使用以下类配置服务器:​​@Configuration​​

@Configurationpublic class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration { @Bean public Queue stockRequestQueue() { return new Queue("app.stock.request"); }}

这是整个类继承链的终点。 最终结果是 并在应用程序启动时向代理声明。 服务器配置中没有绑定到队列,就像在客户端应用程序中那样。 但是,股票请求队列会自动绑定到 AMQP 默认交易所。 此行为由规范定义。​​@Configuration​​​​TopicExchange​​​​Queue​​​​TopicExchange​​

客户端类更有趣一些。 其声明如下:​​@Configuration​​

@Configurationpublic class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration { @Value("${stocks.quote.pattern}") private String marketDataRoutingKey; @Bean public Queue marketDataQueue() { return amqpAdmin().declareQueue(); } /** * Binds to the market data exchange. * Interested in any stock quotes * that match its routing key. */ @Bean public Binding marketDataBinding() { return BindingBuilder.bind( marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey); } // additional code omitted for brevity}

客户端通过 上的方法声明另一个队列。 它将该队列绑定到市场数据交换,并在属性文件中外部化路由模式。​​declareQueue()​​​​AmqpAdmin​​

用于队列和交换的构建器 API

版本 1.6 引入了一个方便流畅的 API,用于在使用 Java 配置时进行配置和对象。 以下示例演示如何使用它:​​Queue​​​​Exchange​​

@Beanpublic Queue queue() { return QueueBuilder.nonDurable("foo") .autoDelete() .exclusive() .withArgument("foo", "bar") .build();}@Beanpublic Exchange exchange() { return ExchangeBuilder.directExchange("foo") .autoDelete() .internal() .withArgument("foo", "bar") .build();}

有关更多信息,请参阅 Javadoc for org.springframework.amqp.core.QueueBuilder 和 org.springframework.amqp.core.ExchangeBuilder。

从版本 2.0 开始,现在默认创建持久交换,以便与各个类上的简单构造函数保持一致。 要与生成器进行非持久交换,请在调用之前使用 。 不再提供不带参数的方法。​​ExchangeBuilder​​​​AbstractExchange​​​​.durable(false)​​​​.build()​​​​durable()​​

版本 2.2 引入了流畅的 API 来添加“众所周知的”交换和队列参数......

@Beanpublic Queue allArgs1() { return QueueBuilder.nonDurable("all.args.1") .ttl(1000) .expires(200_000) .maxLength(42) .maxLengthBytes(10_000) .overflow(Overflow.rejectPublish) .deadLetterExchange("dlx") .deadLetterRoutingKey("dlrk") .maxPriority(4) .lazy() .leaderLocator(LeaderLocator.minLeaders) .singleActiveConsumer() .build();}@Beanpublic DirectExchange ex() { return ExchangeBuilder.directExchange("ex.with.alternate") .durable(true) .alternate("alternate") .build();}
声明交换、队列和绑定的集合

可以将对象集合(、 和 )包装在对象中。 在应用程序上下文中检测此类 bean(以及离散 bean),并在建立连接时(最初和连接失败后)在代理上声明包含的对象。 以下示例演示如何执行此操作:​​Declarable​​​​Queue​​​​Exchange​​​​Binding​​​​Declarables​​​​RabbitAdmin​​​​Declarable​​

@Configurationpublic static class Config { @Bean public CachingConnectionFactory cf() { return new CachingConnectionFactory("localhost"); } @Bean public RabbitAdmin admin(ConnectionFactory cf) { return new RabbitAdmin(cf); } @Bean public DirectExchange e1() { return new DirectExchange("e1", false, true); } @Bean public Queue q1() { return new Queue("q1", false, false, true); } @Bean public Binding b1() { return BindingBuilder.bind(q1()).to(e1()).with("k1"); } @Bean public Declarables es() { return new Declarables( new DirectExchange("e2", false, true), new DirectExchange("e3", false, true)); } @Bean public Declarables qs() { return new Declarables( new Queue("q2", false, false, true), new Queue("q3", false, false, true)); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public Declarables prototypes() { return new Declarables(new Queue(this.prototypeQueueName, false, false, true)); } @Bean public Declarables bs() { return new Declarables( new Binding("q2", DestinationType.QUEUE, "e2", "k2", null), new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)); } @Bean public Declarables ds() { return new Declarables( new DirectExchange("e4", false, true), new Queue("q4", false, false, true), new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)); }}

在 2.1 之前的版本中,您可以通过定义 bean 类型来声明多个实例。 在某些情况下,这可能会导致不良的副作用,因为管理员必须遍历所有 bean。​​Declarable​​​​Collection<Declarable>​​​​Collection<?>​​

版本 2.2 将该方法添加到 ;这可以用作方便,例如,在声明侦听器容器 Bean 时。​​getDeclarablesByType​​​​Declarables​​

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Declarables mixedDeclarables, MessageListener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0])); container.setMessageListener(listener); return container;}
有条件声明

默认情况下,所有队列、交换和绑定都由应用程序上下文中的所有实例(假设它们有)声明。​​RabbitAdmin​​​​auto-startup="true"​​

从版本 2.1.9 开始,具有一个新属性(默认情况下);如果设置为 ,则管理员将仅声明显式配置为由该管理员声明的 Bean。​​RabbitAdmin​​​​explicitDeclarationsOnly​​​​false​​​​true​​

从 1.2 版本开始,您可以有条件地声明这些元素。 当应用程序连接到多个代理并需要指定应向哪些代理声明特定元素时,这特别有用。

表示这些元素的类实现 ,它有两个方法:和 。 使用这些方法来确定特定实例是否应实际处理其 .​​Declarable​​​​shouldDeclare()​​​​getDeclaringAdmins()​​​​RabbitAdmin​​​​Connection​​

这些属性可用作命名空间中的属性,如以下示例所示:

<rabbit:admin id="admin1" connection-factory="CF1" /><rabbit:admin id="admin2" connection-factory="CF2" /><rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" /><rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" /><rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" /><rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" /><rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" /><rabbit:direct-exchange name="direct" declared-by="admin1, admin2"> <rabbit:bindings> <rabbit:binding key="foo" queue="bar"/> </rabbit:bindings></rabbit:direct-exchange>

默认情况下,属性为 and,如果未提供 (或为空),则所有实例都声明该对象(只要 管理员的属性为 、默认值,并且管理员的属性为 false)。​​auto-declare​​​​true​​​​declared-by​​​​RabbitAdmin​​​​auto-startup​​​​true​​​​explicit-declarations-only​​

同样,您可以使用基于 Java 来实现相同的效果。 在以下示例中,组件由 'admin2' 声明,但不由 'admin2' 声明:​​@Configuration​​​​admin1​​

@Beanpublic RabbitAdmin admin1() { return new RabbitAdmin(cf1());}@Beanpublic RabbitAdmin admin2() { return new RabbitAdmin(cf2());}@Beanpublic Queue queue() { Queue queue = new Queue("foo"); queue.setAdminsThatShouldDeclare(admin1()); return queue;}@Beanpublic Exchange exchange() { DirectExchange exchange = new DirectExchange("bar"); exchange.setAdminsThatShouldDeclare(admin1()); return exchange;}@Beanpublic Binding binding() { Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null); binding.setAdminsThatShouldDeclare(admin1()); return binding;}
关于 和 属性的说明​​id​​​​name​​

和元素的属性反映了代理中实体的名称。 对于队列,如果省略 ,则会创建一个匿名队列(请参阅​​匿名队列​​)。​​name​​​​<rabbit:queue/>​​​​<rabbit:exchange/>​​​​name​​

在 2.0 之前的版本中,还注册为 Bean 名称别名(类似于 on 元素)。​​name​​​​name​​​​<bean/>​​

这导致了两个问题:

  • 它阻止了队列和具有相同名称的交换的声明。
  • 如果别名包含 SpEL 表达式 (),则不会解析别名。#{…​}

从 2.0 版开始,如果同时使用 an 和 a 属性声明这些元素之一,则该名称将不再声明为 Bean 名称别名。 如果要声明一个队列并与之交换,则必须提供 .​​id​​​​name​​​​name​​​​id​​

如果元素只有一个属性,则不会有任何更改。 Bean 仍然可以由 引用 — 例如,在绑定声明中。 但是,如果名称包含 SpEL,则仍然无法引用它 — 您必须提供 以供参考。​​name​​​​name​​​​id​​

​​AnonymousQueue​​

通常,当您需要唯一名称、独占、自动删除队列时,我们建议您使用 而不是代理定义的队列名称(用作名称会导致代理生成队列 名称)。​​AnonymousQueue​​​​""​​​​Queue​​

这是因为:

  • 队列实际上是在建立与代理的连接时声明的。 这是在创建豆子并连接在一起很久之后。 使用队列的 bean 需要知道其名称。 事实上,当应用程序启动时,代理甚至可能没有运行。
  • 如果与代理的连接由于某种原因丢失,管理员将使用相同的名称重新声明 。 如果我们使用代理声明的队列,队列名称将更改。AnonymousQueue
  • 您可以控制实例使用的队列名称的格式。​​AnonymousQueue​​

    默认情况下,队列名称的前缀后跟 base64 表示形式,例如:。​​spring.gen-​​​​UUID​​​​spring.gen-MRBv9sqISkuCiPfOYfpo4g​​

    可以在构造函数参数中提供实现。 以下示例演示如何执行此操作:​​AnonymousQueue.NamingStrategy​​

    @Beanpublic Queue anon1() { return new AnonymousQueue();}@Beanpublic Queue anon2() { return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));}@Beanpublic Queue anon3() { return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);}

    第一个 Bean 生成一个队列名称,前缀后跟 — for 的 base64 表示形式 例:。 第二个 Bean 生成一个队列名称,前缀后跟 的 base64 表示形式。 第三个 Bean 仅使用 UUID(无 base64 转换)生成名称,例如 .​​spring.gen-​​​​UUID​​​​spring.gen-MRBv9sqISkuCiPfOYfpo4g​​​​something-​​​​UUID​​​​f20c818a-006b-4416-bf91-643590fedb0e​​

    base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。 删除尾随填充字符 ()。​​=​​

    您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(如应用程序名称或客户端主机)。

    您可以在使用 XML 配置时指定命名策略。 该属性存在于元素上 对于实现 的 Bean 引用。 以下示例演示如何以各种方式指定命名策略:​​naming-strategy​​​​<rabbit:queue>​​​​AnonymousQueue.NamingStrategy​​

    <rabbit:queue id="uuidAnon" /><rabbit:queue id="springAnon" naming-strategy="uuidNamer" /><rabbit:queue id="customAnon" naming-strategy="customNamer" /><bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" /><bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy"> <constructor-arg value="custom.gen-" /></bean>

    第一个示例创建名称,例如 。 第二个示例使用 UUID 的字符串表示形式创建名称。 第三个示例创建名称,例如 。​​spring.gen-MRBv9sqISkuCiPfOYfpo4g​​​​custom.gen-MRBv9sqISkuCiPfOYfpo4g​​

    您还可以提供自己的命名策略 bean。

    从版本 2.1 开始,默认情况下,匿名队列的参数设置为 。 这可确保在应用程序连接到的节点上声明队列。 您可以通过在构造实例后调用来恢复到之前的行为。​​Queue.X_QUEUE_LEADER_LOCATOR​​​​client-local​​​​queue.setLeaderLocator(null)​​

    恢复自动删除声明

    通常,(s) 只恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,则在连接丢失时,代理将删除它们。 重新建立连接后,管理员将重新声明实体。 通常,通过调用 创建的实体不会恢复。​​RabbitAdmin​​​​admin.declareQueue(…)​​​​admin.declareExchange(…)​​​​admin.declareBinding(…)​​

    从 2.4 版开始,管理员有一个新的属性;当 时,除了应用程序上下文中的 Bean 之外,管理员还将恢复这些实体。​​redeclareManualDeclarations​​​​true​​

    如果调用 ,则不会执行单个声明的恢复。 删除队列和交换时,将从可恢复实体中删除关联的绑定。​​deleteQueue(…)​​​​deleteExchange(…)​​​​removeBinding(…)​​

    最后,调用将阻止恢复任何以前声明的实体。​​resetAllManualDeclarations()​​

    上一篇:Java如何实现自定义异常
    下一篇:没有了
    网友评论