在许多情况下,应用程序数据是从流中获取的。 建议不要将对流的引用作为消息有效负载发送给使用者。 相反,消息是从从输入流读取的数据创建的,消息有效负载将逐个写入输出流。
您需要将此依赖项包含在项目中:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>6.0.0</version></dependency>从流中读取
Spring Integration 为流提供了两个适配器。 两者并实施. 通过在通道适配器元素中配置其中一个,可以配置轮询周期,并且消息总线可以自动检测和调度它们。 字节流版本需要 ,字符流版本需要 作为单个构造函数参数。 还接受“bytesPerMessage”属性,以确定它尝试读取每个 . 默认值为 。 以下示例创建一个输入流,该流创建每个消息包含 2048 个字节的消息:ByteStreamReadingMessageSourceCharacterStreamReadingMessageSourceMessageSourceInputStreamReaderByteStreamReadingMessageSourceMessage1024
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource"> <constructor-arg ref="someInputStream"/> <property name="bytesPerMessage" value="2048"/></bean><bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource"> <constructor-arg ref="someReader"/></bean>将读取器包装在 (如果它还没有一个)。 可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。 从版本 5.0 开始,第三个构造函数参数 () 控制 . 当(默认值)时,该方法检查读取器是否为,如果不是,则返回 null。 在这种情况下,不会检测到 EOF(文件结尾)。 当 时,该方法将阻塞,直到数据可用或在基础流上检测到 EOF。 检测到 EOF 时,将发布(应用程序事件)。 您可以将此事件与实现 的 Bean 一起使用。CharacterStreamReadingMessageSourceBufferedReaderblockToDetectEOFCharacterStreamReadingMessageSourcefalsereceive()ready()truereceive()StreamClosedEventApplicationListener<StreamClosedEvent>
为了便于检测 EOF,轮询器线程会阻塞方法,直到数据到达或检测到 EOF。receive()
检测到 EOF 后,轮询器将继续在每个轮询上发布事件。 应用程序侦听器可以停止适配器以防止这种情况。 事件发布在轮询器线程上。 停止适配器会导致线程中断。 如果要在停止适配器后执行某些可中断的任务,则必须在不同的线程上执行 ,或者对这些下游活动使用不同的线程。 请注意,发送到 是可中断的,因此,如果您希望从侦听器发送消息,请在停止适配器之前执行此操作。stop()QueueChannel
这有助于将数据“管道”或重定向到 ,如以下两个示例所示:stdin
cat myfile.txt | java -jar my.jarjava -jar my.jar < foo.txt此方法允许应用程序在管道关闭时停止。
有四种方便的工厂方法可用:
public static final CharacterStreamReadingMessageSource stdin() { ... }public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }public static final CharacterStreamReadingMessageSource stdinPipe() { ... }public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }写入流
对于目标流,可以使用以下两种实现之一:或 。 每个都需要一个构造函数参数(用于字节流或字符流),并且每个构造函数都提供第二个构造函数来添加可选的“bufferSize”。 由于这两个最终都实现了接口,因此您可以从配置中引用它们,如通道适配器中所述。ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandlerOutputStreamWriterMessageHandlerchannel-adapter
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler"> <constructor-arg ref="someOutputStream"/> <constructor-arg value="1024"/></bean><bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler"> <constructor-arg ref="someWriter"/></bean>流命名空间支持
Spring 集成定义了一个命名空间,以减少与流相关的通道适配器所需的配置。 使用它需要以下架构位置:
<?xml version="1.0" encoding="UTF-8"?><beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/stream https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">以下代码片段显示了支持用于配置入站通道适配器的不同配置选项:
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/><int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>从版本 5.0 开始,可以设置属性,以设置属性。 有关详细信息,请参阅从流中读取。detect-eofblockToDetectEOF
若要配置出站通道适配器,还可以使用命名空间支持。 以下示例显示了出站通道适配器的不同配置:
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset" channel="testChannel"/><int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8" channel="testChannel"/><int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/><int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true" channel="testChannel"/>