当前位置 : 主页 > 编程语言 > java >

Spring Integration 对MQTT 支持

来源:互联网 收集:自由互联 发布时间:2022-12-20
Spring 集成提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。 您需要将此依赖项包含在项目中: dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-mqt

Spring Integration 对MQTT 支持_通道适配器

Spring 集成提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。

您需要将此依赖项包含在项目中:

<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>6.0.0</version></dependency>

当前实现使用 Eclipse Paho MQTT 客户端库。

XML 配置和本章的大部分内容都是关于 MQTT v3.1 协议支持和相应的 Paho 客户端的。 有关相应的协议支持,请参阅 MQTT v5 支持段落。

两个适配器的配置都是使用 . 有关配置选项的更多信息,请参阅 Paho 文档。​​DefaultMqttPahoClientFactory​​

我们建议配置对象并将其注入工厂,而不是在工厂本身上设置(已弃用的)选项。​​MqttConnectOptions​​

入站(消息驱动)通道适配器

入站通道适配器由 实现。 为方便起见,可以使用命名空间对其进行配置。 最低配置可能如下所示:​​MqttPahoMessageDrivenChannelAdapter​​

<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <property name="connectionOptions"> <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions"> <property name="userName" value="${mqtt.username}"/> <property name="password" value="${mqtt.password}"/> </bean> </property></bean><int-mqtt:message-driven-channel-adapter id="mqttInbound" client-id="${mqtt.default.client.id}.src" url="${mqtt.url}" topics="sometopic" client-factory="clientFactory" channel="output"/>

以下清单显示了可用的属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter" client-id="foo" url="tcp://localhost:1883" topics="bar,baz" qos="1,2" converter="myConverter" client-factory="clientFactory" send-timeout="123" error-channel="errors" recovery-interval="10000" manual-acks="false" channel="out" />

客户端 ID。

代理 URL。

此适配器从中接收消息的主题的逗号分隔列表。

以逗号分隔的 QoS 值列表。 它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。

一个(可选)。 默认情况下,默认生成一条消息,其中包含具有以下标头的有效负载:​​MqttMessageConverter​​​​DefaultPahoMessageConverter​​​​String​​

  • ​​mqtt_topic​​:接收消息的主题
  • ​​mqtt_duplicate​​​:如果邮件是重复的​​true​​
  • ​​mqtt_qos​​​: 服务质量 可以将 配置为返回有效负载中的原始数据,方法是将其声明为 并将属性设置为 。​​DefaultPahoMessageConverter​​​​byte[]​​​​<bean/>​​​​payloadAsBytes​​​​true​​

客户端工厂。

超时。 仅当通道可能阻塞(例如当前已满的有界通道)时,它才适用。​​send()​​​​QueueChannel​​

错误通道。 下游异常将发送到此通道(如果提供),则在 . 有效负载是包含失败消息和原因的 。​​ErrorMessage​​​​MessagingException​​

恢复间隔。 它控制适配器在发生故障后尝试重新连接的时间间隔。 它默认为(十秒)。​​10000ms​​

确认模式;设置为“为”手动确认”。

从版本 4.1 开始,您可以省略 URL。 相反,您可以在 的属性中提供服务器 URI。 例如,这样做可以连接到高可用性 (HA) 群集。​​serverURIs​​​​DefaultMqttPahoClientFactory​​

从版本 4.2.2 开始,当适配器成功预订主题时,将发布 。 连接或订阅失败时会发布事件。 这些事件可以由实现 的 Bean 接收。​​MqttSubscribedEvent​​​​MqttConnectionFailedEvent​​​​ApplicationListener​​

此外,名为的新属性控制适配器在失败后尝试重新连接的时间间隔。 它默认为(十秒)。​​recoveryInterval​​​​10000ms​​

在版本 4.2.3 之前,客户端始终在适配器停止时取消订阅。 这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便消息到达 当适配器停止时,将在下次启动时交付。 这还需要将客户端工厂上的属性设置为 。 默认为 .​​cleanSession​​​​false​​​​true​​

从版本 4.2.3 开始,如果属性为 。​​cleanSession​​​​false​​

可以通过在工厂上设置属性来重写此行为。 它可以有值:、 和 。 后者(默认值)仅在属性为 .​​consumerCloseAction​​​​UNSUBSCRIBE_ALWAYS​​​​UNSUBSCRIBE_NEVER​​​​UNSUBSCRIBE_CLEAN​​​​cleanSession​​​​true​​

要恢复到 4.2.3 之前的行为,请使用 。​​UNSUBSCRIBE_ALWAYS​​

从版本 5.0 开始,、 和 属性将映射到标头 (、 和 ),以避免无意中传播到(默认情况下)使用 、 和标头的出站消息。​​topic​​​​qos​​​​retained​​​​.RECEIVED_…​​​​MqttHeaders.RECEIVED_TOPIC​​​​MqttHeaders.RECEIVED_QOS​​​​MqttHeaders.RECEIVED_RETAINED​​​​MqttHeaders.TOPIC​​​​MqttHeaders.QOS​​​​MqttHeaders.RETAINED​​

在运行时添加和删除主题

从版本 4.1 开始,可以通过编程方式更改适配器订阅的主题。 Spring Integration 提供了 和 方法。 添加主题时,您可以选择指定(默认值:1)。 您还可以通过向具有适当有效负载的 发送适当的消息来修改主题,例如:。​​addTopic()​​​​removeTopic()​​​​QoS​​​​<control-bus/>​​​​"myMqttAdapter.addTopic('foo', 1)"​​

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。 更改不会保留到应用程序上下文的生命周期之后。 新的应用程序上下文将恢复为配置的设置。

在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。

手动确认

从版本 5.3 开始,可以将该属性设置为 true。 通常用于异步确认传递。 设置为 时,会将标头 () 添加到消息中,值为 . 必须调用该方法才能完成传递。 有关更多信息,请参阅 Javadocs。 为方便起见,提供了一个标头访问器:​​manualAcks​​​​true​​​​IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK​​​​SimpleAcknowledgment​​​​acknowledge()​​​​IMqttClient​​​​setManualAcks()​​​​messageArrivedComplete()​​

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从 version 开始,当消息转换器抛出异常或从转换返回时,如果提供,则会将 发送到 中。 将此转换错误重新抛出到 MQTT 客户端回调中。​​5.2.11​​​​null​​​​MqttMessage​​​​MqttPahoMessageDrivenChannelAdapter​​​​ErrorMessage​​​​errorChannel​​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplicationpublic class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; }}

使用 Java DSL 进行配置

以下 Spring 引导应用程序提供了使用 Java DSL 配置入站适配器的示例:

@SpringBootApplicationpublic class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttInbound() { return IntegrationFlow.from( new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2");) .handle(m -> System.out.println(m.getPayload())) .get(); }}

出站通道适配器

出站通道适配器由 实现,它包装在 中。 为方便起见,可以使用命名空间对其进行配置。​​MqttPahoMessageHandler​​​​ConsumerEndpoint​​

从版本 4.1 开始,适配器支持异步发送操作,避免在确认传递之前阻塞。 如果需要,您可以发出应用程序事件以使应用程序能够确认传递。

以下清单显示了可用于出站通道适配器的属性:

<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter" client-factory="clientFactory" default-qos="1" qos-expression="" default-retained="true" retained-expression="" default-topic="bar" topic-expression="" async="false" async-events="false" channel="target" />

客户端 ID。

代理 URL。

一个(可选)。 默认值识别以下标头:​​MqttMessageConverter​​​​DefaultPahoMessageConverter​​

  • ​​mqtt_topic​​:消息将发送到的主题
  • ​​mqtt_retained​​​:如果要保留消息​​true​​
  • ​​mqtt_qos​​: 服务质量

客户端工厂。

默认服务质量。 如果未找到标头或返回 . 如果您提供自定义 .​​mqtt_qos​​​​qos-expression​​​​null​​​​converter​​

要计算以确定 qos 的表达式。 缺省值为 。​​headers[mqtt_qos]​​

保留标志的默认值。 如果未找到标头,则使用它。 如果提供了自定义,则不使用它。​​mqtt_retained​​​​converter​​

要计算以确定保留的布尔值的表达式。 缺省值为 。​​headers[mqtt_retained]​​

将消息发送到的默认主题(如果未找到标头时使用)。​​mqtt_topic​​

要计算以确定目标主题的表达式。 缺省值为 。​​headers['mqtt_topic']​​

当 时,调用方不会阻塞。 相反,它会在发送消息时等待传递确认。 默认值为(发送块,直到确认传递)。​​true​​​​false​​

当 和 都是 时,将发出 (请参阅事件​)。 它包含消息、主题、客户端库生成的、 和 (每次连接客户端时递增)。 当客户端库确认传递时,将发出 。 它包含 、 和 ,使传递能够与 相关联。 任何或事件入站通道适配器都可以接收这些事件。 请注意,有可能在 之前接收 。 缺省值为 。​​async​​​​async-events​​​​true​​​​MqttMessageSentEvent​​​​messageId​​​​clientId​​​​clientInstance​​​​MqttMessageDeliveredEvent​​​​messageId​​​​clientId​​​​clientInstance​​​​send()​​​​ApplicationListener​​​​MqttMessageDeliveredEvent​​​​MqttMessageSentEvent​​​​false​​

从版本 4.1 开始,可以省略 URL。 相反,可以在 的属性中提供服务器 URI。 例如,这可以实现与高可用性 (HA) 群集的连接。​​serverURIs​​​​DefaultMqttPahoClientFactory​​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置出站适配器的示例:

@SpringBootApplication@IntegrationComponentScanpublic class MqttJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); }}

使用 Java DSL 进行配置

以下 Spring 引导应用程序提供了使用 Java DSL 配置出站适配器的示例:

@SpringBootApplicationpublic class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); }}

事件

某些应用程序事件由适配器发布。

  • ​​MqttConnectionFailedEvent​​- 如果我们无法连接或随后连接丢失,则由两个适配器发布。
  • ​​MqttMessageSentEvent​​- 在发送消息时由出站适配器发布(如果在异步模式下运行)。
  • ​​MqttMessageDeliveredEvent​​- 当客户端指示消息已传递时,如果以异步模式运行,则由出站适配器发布。
  • ​​MqttSubscribedEvent​​- 由订阅主题后由入站适配器发布。

这些事件可以通过 或 方法接收。​​ApplicationListener<MqttIntegrationEvent>​​​​@EventListener​​

若要确定事件的来源,请使用以下命令;您可以检查 Bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();String beanName = source.getBeanName();MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从版本 5.5.5 开始,该模块为 MQTT v5 协议提供通道适配器实现。 这是一个依赖项,因此必须显式包含在目标项目中。​​spring-integration-mqtt​​​​org.eclipse.paho:org.eclipse.paho.mqttv5.client​​​​optional​​

由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了该实现以在发布和接收操作上映射到/从标头。 默认情况下,(通过模式)它映射所有接收的帧属性(包括用户属性)。 在出站端,它映射帧的标头子集:、、、。​​MqttHeaderMapper​​​​*​​​​PUBLISH​​​​PUBLISH​​​​contentType​​​​mqtt_messageExpiryInterval​​​​mqtt_responseTopic​​​​mqtt_correlationData​​

MQTT v5 协议的出站通道适配器以 . 它需要一个和 MQTT 代理 URL 或引用。 它支持一个选项,在这种情况下可以并且可以发出对象(请参阅选项)。 如果请求消息有效负载为 ,则通过内部 . 如果有效负载为 ,则按原样使用目标有效负载进行发布。 如果有效负载为 ,则会将其转换为发布。 其余用例被委托给提供的,这是应用程序上下文中的 bean。 注意:当请求的消息负载已经是 . 以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:​​Mqttv5PahoMessageHandler​​​​clientId​​​​MqttConnectionOptions​​​​MqttClientPersistence​​​​async​​​​MqttIntegrationEvent​​​​asyncEvents​​​​org.eclipse.paho.mqttv5.common.MqttMessage​​​​IMqttAsyncClient​​​​byte[]​​​​MqttMessage​​​​String​​​​byte[]​​​​MessageConverter​​​​IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME​​​​ConfigurableCompositeMessageConverter​​​​HeaderMapper<MqttProperties>​​​​MqttMessage​​

@Beanpublic IntegrationFlow mqttOutFlow() { Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout"); MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE); messageHandler.setHeaderMapper(mqttHeaderMapper); messageHandler.setAsync(true); messageHandler.setAsyncEvents(true); messageHandler.setConverter(mqttStringToBytesConverter()); return f -> f.handle(messageHandler);}

不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。​​org.springframework.integration.mqtt.support.MqttMessageConverter​​​​Mqttv5PahoMessageHandler​​

如果在启动时或运行时连接失败,则会尝试在生成到此处理程序的下一条消息时重新连接。 如果此手动重新连接失败,则连接异常将回传给调用方。 在这种情况下,将应用标准的 Spring 集成错误处理程序,包括请求处理程序建议,例如重试或断路器。​​Mqttv5PahoMessageHandler​​

在javadocs及其超类中查看更多信息。​​Mqttv5PahoMessageHandler​​

MQTT v5 协议的入站通道适配器以 . 它需要一个 MQTT 代理 URL 或引用,以及订阅和使用的主题。 它支持一个选项,默认情况下该选项在内存中。 预期(默认情况下)可以配置,并将其传播到提供的转换从接收的 . 如果设置了该选项,则会在邮件中添加一个标头,以生成为 的实例。 用于将帧属性(包括用户属性)映射到目标邮件头。 标准属性(如、、、、以及接收的主题)始终映射到标头。 有关详细信息,请参阅。​​Mqttv5PahoMessageDrivenChannelAdapter​​​​clientId​​​​MqttConnectionOptions​​​​MqttClientPersistence​​​​payloadType​​​​byte[]​​​​SmartMessageConverter​​​​byte[]​​​​MqttMessage​​​​manualAck​​​​IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK​​​​SimpleAcknowledgment​​​​HeaderMapper<MqttProperties>​​​​PUBLISH​​​​MqttMessage​​​​qos​​​​id​​​​dup​​​​retained​​​​MqttHeaders​​

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Beanpublic IntegrationFlow mqttInFlow() { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest"); messageProducer.setPayloadType(String.class); messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); return IntegrationFlow.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get();}

不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。​​org.springframework.integration.mqtt.support.MqttMessageConverter​​​​Mqttv5PahoMessageDrivenChannelAdapter​​

在javadocs及其超类中查看更多信息。​​Mqttv5PahoMessageDrivenChannelAdapter​​

建议将 true 设置为 true,以便内部实例处理重新连接。 否则,只有手动重新启动 才能处理重新连接,例如通过断开连接时的处理。​​MqttConnectionOptions#setAutomaticReconnect(boolean)​​​​IMqttAsyncClient​​​​Mqttv5PahoMessageDrivenChannelAdapter​​​​MqttConnectionFailedEvent​​

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT 客户端 ID,则无法使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个客户端 ID 的连接数有限制(通常允许单个连接)。 为了将单个客户端重用于不同的通道适配器,可以使用组件并将其传递给所需的任何通道适配器。 它将管理 MQTT 连接生命周期,并在需要时自动重新连接。 此外,自定义连接选项可以提供给客户端管理器,就像目前可以为通道适配器组件完成一样。​​org.springframework.integration.mqtt.core.ClientManager​​​​MqttClientPersistence​​

请注意,MQTT v5 和 v3 通道适配器均受支持。

以下 Java DSL 配置示例演示如何在集成流中使用此客户机管理器:

@Beanpublic ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() { MqttConnectionOptions connectionOptions = new MqttConnectionOptions(); connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" }); connectionOptions.setConnectionTimeout(30000); connectionOptions.setMaxReconnectDelay(1000); connectionOptions.setAutomaticReconnect(true); Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5"); clientManager.setPersistence(new MqttDefaultFilePersistence()); return clientManager;}@Beanpublic IntegrationFlow mqttInFlowTopic1( ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1"); return IntegrationFlow.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get();}@Beanpublic IntegrationFlow mqttInFlowTopic2( ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) { Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2"); return IntegrationFlow.from(messageProducer) .channel(c -> c.queue("fromMqttChannel")) .get();}@Beanpublic IntegrationFlow mqttOutFlow( ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) { return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));}
上一篇:Spring Integration 对R2DBC 支持
下一篇:没有了
网友评论