Spring 集成提供了通道适配器,用于使用数据库查询接收和发送消息。 通过这些适配器,Spring Integration 不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。
您需要将此依赖项包含在项目中:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>6.0.0</version></dependency>缺省情况下,以下 JDBC 组件可用:
- 入站通道适配器
- 出站通道适配器
- 出站网关
- 存储过程入站通道适配器
- 存储过程出站通道适配器
- 存储过程出站网关
Spring Integration JDBC Module 还提供了 JDBC 消息存储区。
入站通道适配器
入站通道适配器的主要功能是执行 SQL 查询并将结果集转换为消息。 消息负载是整个结果集(表示为 ),列表中项的类型取决于行映射策略。 默认策略是通用映射器,它为查询结果中的每一行返回 a。 或者,您可以通过添加对实例的引用来更改此设置(有关行映射的更多详细信息,请参阅 Spring JDBC 文档)。SELECTListMapRowMapper
如果要将查询结果中的行转换为单个消息,可以使用下游拆分器。SELECT
入站适配器还需要引用实例或 .JdbcTemplateDataSource
除了生成消息的语句外,适配器还有一个语句,用于将记录标记为已处理,以便它们不会显示在下一次轮询中。 可以通过原始选择中的 ID 列表对更新进行参数化。 默认情况下,这是通过命名约定完成的(输入结果集中调用的列将转换为名为 的更新的参数映射中的列表)。 以下示例定义具有更新查询和引用的入站通道适配器。SELECTUPDATEididDataSource
<int-jdbc:inbound-channel-adapter query="select * from item where status=2" channel="target" data-source="dataSource" update="update item set status=10 where id in (:id)" />更新查询中的参数使用冒号 () 前缀指定到参数名称(在前面的示例中,参数是要应用于轮询结果集中每一行的表达式)。 这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,并结合了 Spring 集成中采用的约定(投影到轮询结果列表)。 底层的 Spring JDBC 特性限制了可用的表达式(例如,不允许使用句点以外的大多数特殊字符),但由于目标通常是可通过 Bean 路径寻址的对象列表(可能是一个对象列表),因此并没有过度限制。:
要更改参数生成策略,可以将 注入适配器以覆盖默认行为(适配器具有属性)。 Spring Integration 提供了 ,它创建一个基于 SpEL 的参数源,并将查询结果作为对象。 (如果为 true,则根对象为行)。 如果同一参数名称在更新查询中多次出现,则仅计算一次,并缓存其结果。SqlParameterSourceFactorysql-parameter-source-factoryExpressionEvaluatingSqlParameterSourceFactory#rootupdate-per-row
还可以对选择查询使用参数源。 在这种情况下,由于没有要评估的“结果”对象,因此每次都使用单个参数源(而不是使用参数源工厂)。 从版本 4.0 开始,您可以使用 Spring 创建基于 SpEL 的参数源,如以下示例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /><bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /></bean><bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="status" value="@statusBean.which()" /> </map> </property></bean><bean id="statusBean" class="foo.StatusDetermination" />The in each parameter expression can be any valid SpEL expression. The object for the expression evaluation is the constructor argument defined on the bean. It is static for all evaluations (in the preceding example, an empty ).value#rootparameterSourceString
Starting with version 5.0, you ca supply with to specify the target SQL type for the particular parameter.ExpressionEvaluatingSqlParameterSourceFactorysqlParameterTypes
The following example provides SQL types for the parameters being used in the query:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" /><bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <constructor-arg value="" /></bean><bean id="parameterSourceFactory" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="sqlParameterTypes"> <map> <entry key="status" value="#{ T(java.sql.Types).BINARY}" /> </map> </property></bean>Use the factory method. Otherwise, the parameter source caches the result of the evaluation. Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence. createParameterSourceNoCache
轮询和交易
入站适配器接受常规的 Spring 集成轮询器作为子元素。 因此,可以控制轮询的频率(以及其他用途)。 用于 JDBC 使用的轮询器的一个重要特性是可以选择将轮询操作包装在事务中,如以下示例所示:
<int-jdbc:inbound-channel-adapter query="..." channel="target" data-source="dataSource" update="..."> <int:poller fixed-rate="1000"> <int:transactional/> </int:poller></int-jdbc:inbound-channel-adapter>如果未显式指定轮询器,则使用默认值。 与 Spring 集成一样,它可以定义为顶级 bean)。
在前面的示例中,数据库每 1000 毫秒(或每秒轮询一次)一次,并且更新和选择查询都在同一事务中执行。 未显示事务管理器配置。 但是,只要它知道数据源,轮询就是事务性的。 一个常见的用例是将下游通道设置为直接通道(默认),以便在同一线程中调用端点,从而在同一事务中调用端点。 这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。
max-rows对max-messages-per-poll
JDBC 入站通道适配器定义一个名为 的属性。 指定适配器的轮询器时,还可以定义一个名为 的属性。 虽然这两个属性看起来很相似,但它们的含义却大不相同。max-rowsmax-messages-per-poll
max-messages-per-poll指定每个轮询间隔执行查询的次数,而指定每次执行返回的行数。max-rows
在正常情况下,您可能不希望在使用 JDBC 入站通道适配器时设置轮询器的属性。 它的默认值为 ,这意味着 JDBC 入站通道适配器的 receive() 方法对于每个轮询间隔只执行一次。max-messages-per-poll1
将属性设置为较大的值意味着查询将连续多次执行。 有关该属性的详细信息,请参阅 配置入站通道适配器。max-messages-per-pollmax-messages-per-poll
相反,如果属性大于 ,则指定方法创建的查询结果集中要使用的最大行数。 如果该属性设置为 ,则所有行都包含在生成的消息中。 该属性默认为 。max-rows0receive()00
建议通过特定于供应商的查询选项(例如 MySQL 或 SQL Server 或 Oracle 的 . 有关详细信息,请参阅特定的供应商文档。LIMITTOPROWNUM
出站通道适配器
出站通道适配器与入站适配器相反:其角色是处理消息并使用它来执行 SQL 查询。 默认情况下,消息有效负载和标头可用作查询的输入参数,如以下示例所示:
<int-jdbc:outbound-channel-adapter query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])" data-source="dataSource" channel="input"/>在前面的示例中,到达标记的通道的消息具有键为 的映射的有效负载,因此运算符从映射中取消引用该值。 标头也可以作为地图访问。inputsomething[]
前面查询中的参数是传入消息上的 Bean 属性表达式(不是 SpEL 表达式)。 此行为是 的一部分,这是出站适配器创建的默认源。 您可以注入不同的行为以获得不同的行为。SqlParameterSourceSqlParameterSourceFactory
出站适配器需要引用 或 . 还可以注入 以控制每个传入消息与查询的绑定。DataSourceJdbcTemplateSqlParameterSourceFactory
如果输入通道是直接通道,则出站适配器在同一线程中运行其查询,因此,运行与消息发送方相同的事务(如果有)。
使用 SpEL 表达式传递参数
大多数 JDBC 通道适配器的常见要求是将参数作为 SQL 查询或存储过程或函数的一部分传递。 如前所述,默认情况下,这些参数是 Bean 属性表达式,而不是 SpEL 表达式。 但是,如果需要将 SpEL 表达式作为参数传递,则必须显式注入 .SqlParameterSourceFactory
以下示例使用 a 来实现该要求:ExpressionEvaluatingSqlParameterSourceFactory
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input" query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)" sql-parameter-source-factory="spelSource"/><bean id="spelSource" class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <property name="parameterExpressions"> <map> <entry key="id" value="headers['id'].toString()"/> <entry key="createdDate" value="new java.util.Date()"/> <entry key="payload" value="payload"/> </map> </property></bean>有关更多信息,请参见定义参数源。
使用回调PreparedStatement
有时,灵活性和松耦合不能满足目标的需要,或者我们需要做一些低级的JDBC工作。 Spring JDBC 模块提供了用于配置执行环境(例如 或 )和操作参数值(例如 )的 API。 它甚至可以访问用于低级操作的 API,例如 .SqlParameterSourceFactoryPreparedStatementConnectionCallbackPreparedStatementCreatorSqlParameterSourceStatementCallback
从 Spring Integration 4.2 开始,允许在上下文中手动指定参数。 这个类扮演的角色与标准 Spring JDBC API 中的角色完全相同。 实际上,当在 .MessagePreparedStatementSetterPreparedStatementrequestMessagePreparedStatementSetterPreparedStatementSetterJdbcMessageHandlerexecuteJdbcTemplate
此功能接口选项与 互斥,可用作填充 的参数的更强大的替代方法。 例如,当我们需要以流式方式将数据存储到数据库列时,它很有用。 以下示例演示如何执行此操作:sqlParameterSourceFactoryPreparedStatementrequestMessageFileBLOB
@Bean@ServiceActivator(inputChannel = "storeFileChannel")public MessageHandler jdbcMessageHandler(DataSource dataSource) { JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource, "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)"); jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> { ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME)); try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) { ps.setBlob(2, inputStream); } catch (Exception e) { throw new MessageHandlingException(m, e); } ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class))); }); return jdbcMessageHandler;}从 XML 配置的角度来看,该属性在组件上可用。 它允许您指定 Bean 引用。prepared-statement-setter<int-jdbc:outbound-channel-adapter>MessagePreparedStatementSetter
批量更新
从版本 5.1 开始,如果请求消息的有效负载是实例,则执行 a。 如果这样的元素还不是,则每个元素都包装到 a 中,其中包含请求消息中的标头。 在基于常规的配置的情况下,这些消息用于构建上述函数中使用的参数。 应用配置时,将使用变体来迭代每个项的这些消息,并针对它们调用所提供的消息。 选择模式时不支持批量更新。JdbcMessageHandlerJdbcOperations.batchUpdate()IterableIterableMessageMessageSqlParameterSourceFactorySqlParameterSource[]JdbcOperations.batchUpdate()MessagePreparedStatementSetterBatchPreparedStatementSetterMessagePreparedStatementSetterkeysGenerated
出站网关
出站网关类似于出站和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将其发送到回复通道来响应结果。 默认情况下,消息有效负载和标头可用作查询的输入参数,如以下示例所示:
<int-jdbc:outbound-gateway update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])" request-channel="input" reply-channel="output" data-source="dataSource" />前面示例的结果是将一条记录插入表中,并向输出通道返回一条消息,指示受影响的行数(有效负载是映射:)。mythings{UPDATED=1}
如果更新查询是包含自动生成的键的插入,则可以通过添加到前面的示例(这不是默认值,因为某些数据库平台不支持它)来使用生成的键填充回复消息。 以下示例显示了更改的配置:keys-generated="true"
<int-jdbc:outbound-gateway update="insert into mythings (status, name) values (0, :payload[thing])" request-channel="input" reply-channel="output" data-source="dataSource" keys-generated="true"/>除了更新计数或生成的键之外,您还可以提供一个选择查询来执行并从结果(例如入站适配器)生成回复消息,如以下示例所示:
<int-jdbc:outbound-gateway update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])" query="select * from foos where id=:headers[$id]" request-channel="input" reply-channel="output" data-source="dataSource"/>从 Spring Integration 2.2 开始,更新 SQL 查询不再是强制性的。 现在,您可以使用属性或元素仅提供选择查询。 如果需要使用通用网关或有效负载扩充器等方式主动检索数据,这将非常有用。 然后,从结果生成回复消息(类似于入站适配器的工作方式)并传递到回复通道。 以下示例显示如何使用该属性:queryqueryquery
<int-jdbc:outbound-gateway query="select * from foos where id=:headers[id]" request-channel="input" reply-channel="output" data-source="dataSource"/>默认情况下,查询的组件仅从游标返回一行(第一行)。 您可以使用该选项调整此行为。 如果需要从 SELECT 返回所有行,请考虑指定 。SELECTmax-rowsmax-rows="0"
与通道适配器一样,您还可以提供请求和回复的实例。 默认值与出站适配器的默认值相同,因此请求消息可用作表达式的根。 如果 ,表达式的根是生成的键(如果只有一个键,则为映射,如果为多值,则为映射列表)。SqlParameterSourceFactorykeys-generated="true"
出站网关需要引用 或 . 它还可以注入一个来控制传入消息与查询的绑定。DataSourceJdbcTemplateSqlParameterSourceFactory
从版本 4.2 开始,该属性在 上可用作 的替代项。 它允许您指定 Bean 引用,该引用在执行之前实现更复杂的准备。request-prepared-statement-setter<int-jdbc:outbound-gateway>request-sql-parameter-source-factoryMessagePreparedStatementSetterPreparedStatement
从版本 6.0 开始,将按原样返回空列表结果,而不是将其转换为以前的意思是“无回复”。 这在处理空列表是下游逻辑一部分的应用程序中导致了额外的配置。 有关可能的空列表处理选项,请参阅拆分器丢弃通道。JdbcOutboundGatewaynull
有关 的详细信息,请参阅出站通道适配器。MessagePreparedStatementSetter
JDBC 消息存储库
Spring Integration 提供了两个特定于 JDBC 的消息存储库实现。 适用于聚合器和声明检查模式。 该实现专门为消息通道提供了更具针对性和可伸缩性的实现。JdbcMessageStoreJdbcChannelMessageStore
请注意,您可以使用 来备份消息通道,为此目的进行了优化。JdbcMessageStoreJdbcChannelMessageStore
从版本 5.0.11、5.1.2 开始,优化了 的索引。 如果此类存储中具有大型消息组,则可能希望更改索引。 此外,索引被注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。JdbcChannelMessageStorePriorityChannel
使用 时,必须添加优先级通道索引,因为它包含在查询的提示中。OracleChannelMessageStoreQueryProvider
初始化数据库
在开始使用 JDBC 消息存储库组件之前,应使用适当的对象置备目标数据库。
Spring 集成附带了一些可用于初始化数据库的示例脚本。 在 JAR 文件中,可以在包中找到脚本。 它为一系列常见数据库平台提供了一个示例创建和示例删除脚本。 使用这些脚本的一种常见方法是在 Spring JDBC 数据源初始值设定项中引用它们。 请注意,这些脚本作为示例以及所需表名和列名的规范提供。 您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。spring-integration-jdbcorg.springframework.integration.jdbc
通用 JDBC 消息存储库
JDBC 模块提供了由数据库支持的 Spring 集成(在声明检查模式中很重要)和(在有状态模式(如聚合器)中很重要)的实现。 这两个接口都由 实现,并且支持在 XML 中配置存储实例,如以下示例所示:MessageStoreMessageGroupStoreJdbcMessageStore
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>您可以指定 而不是 .JdbcTemplateDataSource
以下示例显示了其他一些可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource" lob-handler="lobHandler" table-prefix="MY_INT_"/>在前面的示例中,我们指定了一个用于将消息作为大型对象(这对于 Oracle 通常是必需的)进行处理的,并为存储生成的查询中的表名指定了一个前缀。 表名前缀缺省为 。LobHandlerINT_
支持消息通道
如果您打算使用 JDBC 支持消息通道,我们建议您使用该实现。 它仅与消息通道结合使用。JdbcChannelMessageStore
支持的数据库
使用特定于数据库的 SQL 查询从数据库中检索消息。 因此,必须在 上设置该属性。 这将为您指定的特定数据库提供 SQL 查询。 Spring 集成为以下关系数据库提供支持:JdbcChannelMessageStoreChannelMessageStoreQueryProviderJdbcChannelMessageStorechannelMessageStoreQueryProvider
- PostgreSQL
- 恒青铁银行
- MySQL
- 神谕
- 德比
- H2
- SqlServer
- Sybase
- DB2
如果未列出您的数据库,则可以扩展类并提供您自己的自定义查询。AbstractChannelMessageStoreQueryProvider
版本 4.0 将列添加到表中,以确保先进先出 (FIFO) 排队,即使消息存储在同一毫秒内也是如此。MESSAGE_SEQUENCE
自定义消息插入
从版本 5.0 开始,通过重载类,可以在 中提供用于消息插入的自定义实现。 您可以使用它来设置不同的列或更改表结构或序列化策略。 例如,您可以将其结构存储为 JSON 字符串,而不是默认序列化为 。ChannelMessageStorePreparedStatementSetterJdbcChannelMessageStorebyte[]
以下示例使用 的默认实现 来存储公共列,并重写将消息负载存储为 的行为:setValuesvarchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { @Override public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region, boolean priorityEnabled) throws SQLException { // Populate common columns super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled); // Store message payload as varchar preparedStatement.setString(6, requestMessage.getPayload().toString()); }}通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源:
- 数据库作为队列反模式。
如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。
并发轮询
轮询消息通道时,您可以选择配置与引用关联的 。PollerTaskExecutor
但请记住,如果使用 JDBC 支持的消息通道,并且计划轮询通道,从而轮询具有多个线程的消息存储,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby在这方面是有问题的。
要获得更好的 JDBC 队列吞吐量并避免当不同的线程可能从队列中轮询相同的线程时出现问题,在使用不支持 MVCC 的数据库时,将属性设置为 to 非常重要。 以下示例演示如何执行此操作:MessageusingIdCacheJdbcChannelMessageStoretrue
<bean id="queryProvider" class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/><int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/></int:transaction-synchronization-factory><task:executor id="pool" pool-size="10" queue-capacity="10" rejection-policy="CALLER_RUNS" /><bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> <property name="region" value="TX_TIMEOUT"/> <property name="usingIdCache" value="true"/></bean><int:channel id="inputChannel"> <int:queue message-store="store"/></int:channel><int:bridge input-channel="inputChannel" output-channel="outputChannel"> <int:poller fixed-delay="500" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller></int:bridge><int:channel id="outputChannel" />优先频道
从版本 4.0 开始,实现并提供选项,使其用作实例的参考。 为此,该表具有一列来存储邮件头的值。 此外,新列使我们能够实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储多个消息也是如此。 使用 从数据库中轮询(选择)消息。JdbcChannelMessageStorePriorityCapableChannelMessageStorepriorityEnabledmessage-storepriority-queueINT_CHANNEL_MESSAGEMESSAGE_PRIORITYPRIORITYMESSAGE_SEQUENCEorder by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
我们不建议对优先级和非优先级队列通道使用相同的 bean,因为该选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。 但是,相同的表(甚至)可以用于这两种类型。 要配置该方案,可以从一个消息存储库 Bean 扩展另一个消息存储库 Bean,如以下示例所示:JdbcChannelMessageStorepriorityEnabledINT_CHANNEL_MESSAGEregionJdbcChannelMessageStore
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/></bean><int:channel id="queueChannel"> <int:queue message-store="channelStore"/></int:channel><bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/></bean><int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/></int:channel>对邮件存储区进行分区
通常将 用作同一应用程序中一组应用程序或节点的全局存储。 为了提供一些防止名称冲突的保护并提供对数据库元数据配置的控制,消息存储库允许以两种方式对表进行分区。 一种方法是通过更改前缀来使用单独的表名(如前所述)。 另一种方法是指定用于在单个表中对数据进行分区的名称。 第二种方法的一个重要用例是,当 管理支持 Spring 集成消息通道的持久队列时。 持久通道的消息数据在通道名称的存储中键入。 因此,如果通道名称不是全局唯一的,则通道可以选取不适合它们的数据。 若要避免此危险,可以使用消息存储为具有相同逻辑名称的不同物理通道保持数据独立。JdbcMessageStoreregionMessageStoreregion
PostgreSQL:接收推送通知
PostgreSQL 提供了一个侦听和通知框架,用于在数据库表操作时接收推送通知。 Spring 集成利用这种机制(从版本 6.0 开始)允许在将新消息添加到 . 使用此功能时,必须定义一个数据库触发器,该触发器可以作为 Spring 集成的 JDBC 模块中包含的文件注释的一部分找到。JdbcChannelMessageStoreschema-postgresql.sql
推送通知通过类接收,该类允许其订阅者在任何给定和的新消息到达时接收回调。 即使消息附加到不同的 JVM 上,但附加到同一数据库,也会收到这些通知。 该实现使用协定从存储中提取消息,作为对来自上述通知的通知的反应。PostgresChannelMessageTableSubscriberregiongroupIdPostgresSubscribableChannelPostgresChannelMessageTableSubscriber.SubscriptionPostgresChannelMessageTableSubscriber
例如,可以按如下方式接收推送通知:some group
@Beanpublic JdbcChannelMessageStore messageStore(DataSource dataSource) { JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource); messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider()); return messageStore;}@Beanpublic PostgresChannelMessageTableSubscriber subscriber( @Value("${spring.datasource.url}") String url, @Value("${spring.datasource.username}") String username, @Value("${spring.datasource.password}") String password) { return new PostgresChannelMessageTableSubscriber(() -> DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));}@Beanpublic PostgresSubscribableChannel channel( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore) { return new PostgresSubscribableChannel(messageStore, "some group", subscriber);}任何活动在其活动生命周期期间都会占用独占 JDBC。 因此,重要的是此连接不是源自 池化 . 此类连接池通常希望在预定义的超时窗口内关闭已发布的连接。PostgresChannelMessageTableSubscriberConnectionDataSource
对于这种独占连接的需求,还建议 JVM 只运行一个可用于注册任意数量的订阅的订阅。PostgresChannelMessageTableSubscriber
存储过程
在某些情况下,普通的 JDBC 支持是不够的。 也许您处理的是旧式关系数据库架构,或者您有复杂的数据处理需求,但最终,您必须使用存储过程或存储函数。 从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:
- 存储过程入站通道适配器
- 存储过程出站通道适配器
- 存储过程出站网关
支持的数据库
为了启用对存储过程和存储函数的调用,存储过程组件使用 org.springframework.jdbc.core.simple.SimpleJdbcCall 类。 因此,完全支持以下数据库执行存储过程:
- 阿帕奇德比
- DB2
- MySQL
- 微软SQL Server
- 神谕
- PostgreSQL
- Sybase
如果要改为执行存储函数,则完全支持以下数据库:
- MySQL
- 微软SQL Server
- 神谕
- PostgreSQL
即使您的特定数据库可能不完全受支持,只要您的 RDBMS 支持存储过程或存储函数,您还是可以非常成功地使用存储过程 Spring 集成组件。
事实上,一些提供的集成测试使用 H2 数据库。 尽管如此,彻底测试这些使用场景非常重要。
配置
存储过程组件提供完整的 XML 命名空间支持,并且配置组件与前面讨论的通用 JDBC 组件类似。
通用配置属性
所有存储过程组件共享某些配置参数:
- auto-startup:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。 默认为 . 自选。true
- data-source:对 的引用,用于访问数据库。 必填。javax.sql.DataSource
- id:标识基础 Spring Bean 定义,该定义是 或 的实例,具体取决于出站通道适配器的属性是引用 a 还是 。 自选。EventDrivenConsumerPollingConsumerchannelSubscribableChannelPollableChannel
- ignore-column-meta-data:对于完全支持的数据库,基础 SimpleJdbcCall 类可以从 JDBC 元数据中自动检索存储过程或存储函数的参数信息。但是,如果数据库不支持元数据查找,或者需要提供自定义参数定义,则可以将此标志设置为 。 默认为 . 自选。truefalse
- is-function:如果 ,则调用 SQL 函数。 在这种情况下,or 属性定义被调用函数的名称。 默认为 . 自选。truestored-procedure-namestored-procedure-name-expressionfalse
- stored-procedure-name:此属性指定存储过程的名称。 如果该属性设置为 ,则此属性将改为指定函数名称。 此属性或必须指定。is-functiontruestored-procedure-name-expression
- stored-procedure-name-expression:此属性使用 SpEL 表达式指定存储过程的名称。 通过使用 SpEL,您可以访问完整的消息(如果可用),包括其标头和有效负载。 可以使用此属性在运行时调用不同的存储过程。 例如,可以提供要作为消息头执行的存储过程名称。 表达式必须解析为 .String如果该属性设置为 ,则此属性指定存储函数。 此属性或必须指定。is-functiontruestored-procedure-name
- jdbc-call-operations-cache-size:定义缓存实例的最大数量。 基本上,对于每个存储过程名称,都会创建一个新的 SimpleJdbcCallOperations 实例,作为回报,该实例将被缓存。SimpleJdbcCallOperations
Spring Integration 2.2 添加了属性和属性。stored-procedure-name-expressionjdbc-call-operations-cache-size
缺省高速缓存大小为 。 值 禁用缓存。 不允许使用负值。100
如果启用 JMX,则有关 的统计信息将作为 MBean 公开。 有关详细信息,请参阅 MBean 导出器。jdbc-call-operations-cache
- sql-parameter-source-factory:(不适用于存储过程入站通道适配器。 对 . 缺省情况下,传入有效负载的 Bean 属性通过使用 .SqlParameterSourceFactoryMessageBeanPropertySqlParameterSourceFactory这对于基本用例可能就足够了。 对于更复杂的选项,请考虑传入一个或多个值。 请参见定义参数源。 自选。ProcedureParameter
- use-payload-as-parameter-source:(不适用于存储过程入站通道适配器。 如果设置为 ,则 的有效负载将用作提供参数的源。 但是,如果设置为 ,则整个参数可用作参数的源。trueMessagefalseMessage如果未传入任何过程参数,则此属性默认为 。 这意味着,通过使用缺省值,有效负载的 Bean 属性将用作存储过程或存储函数的参数值的源。trueBeanPropertySqlParameterSourceFactory但是,如果传入过程参数,则此属性(默认情况下)的计算结果为 。 让我们提供 SpEL 表达式。 因此,访问整个. 该属性在基础 上设置。 自选。falseProcedureParameterMessageStoredProcExecutor
通用配置子元素
存储过程组件共享一组通用的子元素,可用于定义参数并将其传递给存储过程或存储函数。 以下元素可用:
- parameter
- returning-resultset
- sql-parameter-definition
- poller
- parameter:提供提供存储过程参数的机制。 参数可以是静态的,也可以是使用 SpEL 表达式提供的。
+ <1> 要传递到存储过程或存储函数中的参数的名称。 必填。 <2> 此属性指定值的类型。 如果未提供任何内容,则此属性默认为 。 仅当使用该属性时,才使用此属性。 自选。 <3> 参数的值。 必须提供此属性或属性。 自选。 <4> 您可以指定一个 SpEL 表达式来传递参数的值,而不是属性。 如果指定 ,则不允许使用该属性。 自选。java.lang.Stringvalueexpressionvalueexpressionvalue
自选。
- returning-resultset:存储过程可能会返回多个结果集。 通过设置一个或多个元素,可以指定将返回的每个元素转换为有意义的对象。 自选。returning-resultsetRowMappersResultSet
- sql-parameter-definition:如果使用完全受支持的数据库,则通常不必指定存储过程参数定义。 相反,这些参数可以从 JDBC 元数据自动派生。 但是,如果使用不完全受支持的数据库,则必须使用该元素显式设置这些参数。sql-parameter-definition您还可以选择关闭使用该属性对通过 JDBC 获取的参数元数据信息的任何处理。ignore-column-meta-data
指定 SQL 参数的名称。 必填。
指定 SQL 参数定义的方向。 默认值为 。 有效值为:、 和 。 如果过程返回结果集,请使用该元素。 自选。ININOUTINOUTreturning-resultset
用于此 SQL 参数定义的 SQL 类型。 转换为整数值,如 所定义。 或者,您也可以提供整数值。 如果未显式设置此属性,则默认为“VARCHAR”。 自选。java.sql.Types
SQL 参数的小数位数。 仅用于数字和十进制参数。 自选。
用户名类型,例如:、、 和命名数组类型。 此属性与该属性互斥。 自选。typeNameSTRUCTDISTINCTJAVA_OBJECTscale
对复杂类型的自定义值处理程序的引用。 SqlReturnType 的实现。 此属性与属性互斥,仅适用于 OUT 和 INOUT 参数。 自选。scale
- poller:如果此端点是 . 自选。PollingConsumer
定义参数源
参数源控制检索 Spring 集成消息属性并将其映射到相关存储过程输入参数的技术。
存储过程组件遵循某些规则。 缺省情况下,有效负载的 Bean 属性用作存储过程输入参数的源。 在这种情况下,使用 a。 这对于基本用例可能就足够了。 下一个示例说明了该默认行为。MessageBeanPropertySqlParameterSourceFactory
对于使用 to Work 对 Bean 属性的“自动”查找,必须以小写形式定义 Bean 属性。 这是因为在 (Java 方法是 ) 中,检索到的存储过程参数声明将转换为小写。 因此,如果具有驼峰大小写 Bean 属性(如 ),则查找将失败。 在这种情况下,请提供显式 .BeanPropertySqlParameterSourceFactoryorg.springframework.jdbc.core.metadata.CallMetaDataContextmatchInParameterValuesWithCallParameters()lastNameProcedureParameter
假设我们有一个有效负载,它由具有以下三个属性的简单 bean 组成:、 和 . 此外,我们有一个简单的存储过程,称为它接受三个输入参数:、 和 。 我们还使用完全支持的数据库。 在这种情况下,存储过程出站适配器的以下配置就足够了:idnamedescriptionINSERT_COFFEEidnamedescription
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource" channel="insertCoffeeProcedureRequestChannel" stored-procedure-name="INSERT_COFFEE"/>对于更复杂的选项,请考虑传入一个或多个值。ProcedureParameter
如果显式提供值,则默认情况下,将使用 用于参数处理,以启用 SpEL 表达式的全部功能。ProcedureParameterExpressionEvaluatingSqlParameterSourceFactory
如果需要更好地控制参数的检索方式,请考虑使用属性传入自定义实现 。SqlParameterSourceFactorysql-parameter-source-factory
存储过程入站通道适配器
下面的清单调用了对存储过程入站通道适配器重要的属性:
<int-jdbc:stored-proc-inbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" skip-undeclared-results="" return-value-required="false" <int:poller/> <int-jdbc:sql-parameter-definition name="" direction="IN" type="STRING" scale=""/> <int-jdbc:parameter name="" type="" value=""/> <int-jdbc:parameter name="" expression=""/> <int-jdbc:returning-resultset name="" row-mapper="" /></int-jdbc:stored-proc-inbound-channel-adapter>将轮询消息发送到的通道。 如果存储过程或函数不返回任何数据,则 的有效负载为 null。 必填。Message
如果此属性设置为 ,则会绕过存储过程调用中没有相应声明的所有结果。 例如,存储过程可以返回更新计数值,即使存储过程只声明了一个结果参数也是如此。 确切的行为取决于数据库实现。 该值在基础 . 该值默认为 。 自选。trueSqlOutParameterJdbcTemplatetrue
指示是否应包含此过程的返回值。 自春季集成 3.0 以来。 自选。
存储过程出站通道适配器
下面的清单调用了对存储过程出站通道适配器重要的属性:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" order="" sql-parameter-source-factory="" use-payload-as-parameter-source=""> <int:poller fixed-rate=""/> <int-jdbc:sql-parameter-definition name=""/> <int-jdbc:parameter name=""/></int-jdbc:stored-proc-outbound-channel-adapter>此终结点的接收消息通道。 必填。
指定此终结点作为订阅者连接到通道时的调用顺序。 当该通道使用调度策略时,这一点尤其重要。 当此终结点本身是具有队列的通道的轮询使用者时,它不起作用。 自选。failover
存储过程出站网关
下面的清单调用了对存储过程出站通道适配器重要的属性:
<int-jdbc:stored-proc-outbound-gateway request-channel="" stored-procedure-name="" data-source="" auto-startup="true" id="" ignore-column-meta-data="false" is-function="false" order="" reply-channel="" reply-timeout="" return-value-required="false" skip-undeclared-results="" sql-parameter-source-factory="" use-payload-as-parameter-source=""><int-jdbc:sql-parameter-definition name="" direction="IN" type="" scale="10"/><int-jdbc:sql-parameter-definition name=""/><int-jdbc:parameter name="" type="" value=""/><int-jdbc:parameter name="" expression=""/><int-jdbc:returning-resultset name="" row-mapper="" />此终结点的接收消息通道。 必填。
收到数据库响应后应将回复发送到的消息通道。 自选。
允许您指定此网关在引发异常之前等待回复消息成功发送的时间。 请记住,当发送到 时,调用发生在发送方的线程中。 因此,发送操作的失败可能是由下游的其他组件引起的。 默认情况下,网关无限期等待。 该值以毫秒为单位指定。 自选。DirectChannel
指示是否应包含此过程的返回值。 自选。
如果将该属性设置为 ,则会绕过存储过程调用中没有相应声明的所有结果。 例如,存储过程可能会返回更新计数值,即使存储过程只声明了一个结果参数。 确切的行为取决于数据库。 该值在基础 . 该值默认为 。 自选。skip-undeclared-resultstrueSqlOutParameterJdbcTemplatetrue
例子
本节包含两个调用 Apache Derby 存储过程的示例。 第一个过程调用返回 . 通过使用 ,数据被转换为域对象,然后成为 Spring 集成消息有效负载。ResultSetRowMapper
在第二个示例中,我们调用一个存储过程,该过程改为使用输出参数返回数据。
看看 Spring 集成示例项目。
该项目包含此处引用的 Apache Derby 示例,以及如何运行它的说明。 Spring 集成示例项目还提供了使用 Oracle 存储过程的示例。
在第一个示例中,我们调用一个名为的存储过程,该过程不定义任何输入参数,但返回 .FIND_ALL_COFFEE_BEVERAGESResultSet
在Apache Derby中,存储过程是用Java实现的。 以下清单显示了方法签名:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages) throws SQLException { ...}以下清单显示了相应的 SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';在 Spring 集成中,您现在可以通过使用例如 a 来调用此存储过程,如以下示例所示:stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all" data-source="dataSource" request-channel="findAllProcedureRequestChannel" expect-single-result="true" stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES"><int-jdbc:returning-resultset name="coffeeBeverages" row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/></int-jdbc:stored-proc-outbound-gateway>在第二个示例中,我们调用具有一个输入参数的名为的存储过程。 它不返回 ,而是使用输出参数。 以下示例显示了方法签名:FIND_COFFEEResultSet
public static void findCoffee(int coffeeId, String[] coffeeDescription) throws SQLException { ...}以下清单显示了相应的 SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';在 Spring 集成中,您现在可以通过使用例如 a 来调用此存储过程,如以下示例所示:stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee" data-source="dataSource" request-channel="findCoffeeProcedureRequestChannel" skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE" expect-single-result="true"> <int-jdbc:parameter name="ID" expression="payload" /></int-jdbc:stored-proc-outbound-gateway>JDBC 锁定注册表
版本 4.3 引入了 . 某些组件(例如,聚合器和重新排序器)使用从实例获取的锁来确保一次只有一个线程操作一个组。 在单个组件中执行此功能。 您现在可以在这些组件上配置外部锁定注册表。 与共享 一起使用时,可以使用 跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。JdbcLockRegistryLockRegistryDefaultLockRegistryMessageGroupStoreJdbcLockRegistry
当一个本地线程释放一个锁时,另一个本地线程通常可以立即获取该锁。 如果锁由使用不同注册表实例的线程释放,则最多可能需要 100 毫秒才能获取锁。
它基于抽象,它有一个实现。 数据库模式脚本位于包中,该包针对特定的 RDBMS 供应商进行划分。 例如,以下清单显示了锁定表的 H2 DDL:JdbcLockRegistryLockRepositoryDefaultLockRepositoryorg.springframework.integration.jdbc
CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36), REGION VARCHAR(100), CLIENT_ID CHAR(36), CREATED_DATE TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION));可以根据目标数据库设计要求进行更改。 因此,必须在 Bean 定义上使用属性。INT_prefixDefaultLockRepository
有时,一个应用程序已移动到无法释放分布式锁并删除数据库中特定记录的状态。 为此,此类死锁可以在下一次锁定调用时由其他应用程序过期。 上的 (TTL) 选项是为此目的提供的。 您可能还希望为给定实例指定存储的锁。 如果是这样,则可以指定要与 关联的 作为构造函数参数。timeToLiveDefaultLockRepositoryCLIENT_IDDefaultLockRepositoryidDefaultLockRepository
从版本 5.1.8 开始,可以使用 - a 配置为在锁定记录插入/更新执行之间休眠。 默认情况下,它是毫秒,在某些环境中,非领导者经常污染与数据源的连接。JdbcLockRegistryidleBetweenTriesDuration100
从版本 5.4 开始,该接口已引入并添加到 中。 必须在锁定过程中调用该方法,以防锁定进程将比锁定的生存时间长。 因此,生存时间可以大大减少,部署可以快速重新夺回丢失的锁。RenewableLockRegistryJdbcLockRegistryrenewLock()
仅当锁由当前线程持有时,才能进行锁更新。
5.5.6 版本的字符串,支持通过 自动清理 JdbcLock 的缓存。 有关更多信息,请参阅其 JavaDocs。JdbcLockRegistryJdbcLockRegistry.locksJdbcLockRegistry.setCacheCapacity()
字符串 与版本 6.0 一起,可以提供 而不是依赖于应用程序上下文中的主 Bean。DefaultLockRepositoryPlatformTransactionManager
JDBC 元数据存储
版本 5.0 引入了 JDBC(请参阅元数据存储)实现。 可以使用 在应用程序重新启动期间维护元数据状态。 此实现可以与如下所示的适配器一起使用:MetadataStoreJdbcMetadataStoreMetadataStore
- 馈送入站通道适配器
- 文件入站通道适配器
- FTP 入站通道适配器
- SFTP 入站通道适配器
要将这些适配器配置为使用 ,请使用 的 Bean 名称声明 Spring Bean。 源入站通道适配器和源入站通道适配器都自动拾取并使用声明的 ,如以下示例所示:JdbcMetadataStoremetadataStoreJdbcMetadataStore
@Beanpublic MetadataStore metadataStore(DataSource dataSource) { return new JdbcMetadataStore(dataSource);}该软件包具有适用于多个 RDMBS 供应商的数据库架构脚本。 例如,以下清单显示了元数据表的 H2 DDL:org.springframework.integration.jdbc
CREATE TABLE INT_METADATA_STORE ( METADATA_KEY VARCHAR(255) NOT NULL, METADATA_VALUE VARCHAR(4000), REGION VARCHAR(100) NOT NULL, constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION));您可以更改前缀以匹配目标数据库设计要求。 您还可以配置为使用自定义前缀。INT_JdbcMetadataStore
实现 ,使其在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值。 由于交易保证,所有这些操作都是原子的。JdbcMetadataStoreConcurrentMetadataStore
事务管理必须使用 。 入站通道适配器可以提供对轮询器配置中的引用。 与非事务性实现不同,使用 ,该条目仅在事务提交后才会出现在目标表中。 发生回滚时,不会向表中添加任何条目。JdbcMetadataStoreTransactionManagerMetadataStoreJdbcMetadataStoreINT_METADATA_STORE
从版本 5.0.7 开始,您可以为 RDBMS 供应商特定的选项配置对元数据存储条目的基于锁的查询。 默认情况下,如果目标数据库不支持行锁定功能,则可以使用空字符串对其进行配置。 请咨询供应商,了解表达式中用于在更新前锁定行的特定和可能的提示。JdbcMetadataStorelockHintFOR UPDATESELECT