Spring Integration for Apache Kafka 基于 Spring for Apache Kafka 项目。
您需要将此依赖项包含在项目中:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>6.0.0</version></dependency>它提供以下组件:
- 出站通道适配器
- 消息驱动的通道适配器
- 入站通道适配器
- 出站网关
- 入站网关
- 由 Apache Kafka 主题支持的频道
出站通道适配器
出站通道适配器用于将消息从 Spring 集成通道发布到 Apache Kafka 主题。 通道在应用程序上下文中定义,然后连接到向 Apache Kafka 发送消息的应用程序。 发送方应用程序可以使用 Spring Integration 消息发布到 Apache Kafka,这些消息由出站通道适配器在内部转换为 Kafka 记录,如下所示:
- Spring 集成消息的有效负载用于填充 Kafka 记录的有效负载。
- 默认情况下,Spring 集成消息的标头用于填充 Kafka 记录的键。kafka_messageKey
您可以自定义目标主题和分区,以便分别通过 和 标头发布消息。kafka_topickafka_partitionId
此外,还提供了通过在出站消息上应用 SpEL 表达式来提取密钥、目标主题和目标分区的功能。 为此,它支持三对互斥的属性:<int-kafka:outbound-channel-adapter>
- topic和topic-expression
- message-key和message-key-expression
- partition-id和partition-id-expression
这些允许您分别指定 、 和 作为适配器上的静态值,或者在运行时根据请求消息动态评估它们的值。topicmessage-keypartition-id
接口(由 提供)包含用于交互的常量 头。 和默认标头现在需要前缀。 从使用旧标头的早期版本迁移时,需要在 . 或者,您可以使用 或 . 如果使用常量值,还可以使用 和 在适配器上配置这些常量值。KafkaHeadersspring-kafkamessageKeytopickafka_message-key-expression="headers['messageKey']"topic-expression="headers['topic']"<int-kafka:outbound-channel-adapter>KafkaHeaders<header-enricher>MessageBuildertopicmessage-key
注意 : 如果适配器配置了主题或消息键(使用常量或表达式),则使用这些主题或消息键并忽略相应的标头。 如果您希望标头覆盖配置,则需要在表达式中对其进行配置,如下所示:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"适配器需要一个 ,而这又需要一个适当配置的 。KafkaTemplateKafkaProducerFactory
如果提供了 () 并且收到故障(同步或异步),则会向通道发送 。 有效负载是具有 、 () 和属性的。 您可以通过设置属性来覆盖 。send-failure-channelsendFailureChannelsend()ErrorMessageKafkaSendFailureExceptionfailedMessagerecordProducerRecordcauseDefaultErrorMessageStrategyerror-message-strategy
如果提供了 (),则在成功发送后发送有效负载类型的消息。send-success-channelsendSuccessChannelorg.apache.kafka.clients.producer.RecordMetadata
如果应用程序使用事务,并且使用相同的通道适配器在事务由侦听器容器启动的位置发布消息,以及在没有现有事务的情况下发布,则必须在 上配置 以覆盖容器或事务管理器使用的前缀。 容器启动的事务(创建者工厂或事务管理器属性)使用的前缀在所有应用程序实例上必须相同。 用于仅生产者事务的前缀在所有应用程序实例上必须是唯一的。transactionIdPrefixKafkaTemplate
您可以配置 必须解析为布尔值。 如果您使用的是 和 Kafka 生产者属性,则在发送多条消息后刷新可能很有用;表达式的计算结果应为 在最后一条消息上,并且将立即发送不完整的批处理。 默认情况下,表达式在标头 () 中查找值。 如果值为,则会发生刷新,如果值为或标头不存在,则不会发生刷新。flushExpressionlinger.msbatch.sizeBoolean.TRUEBooleanKafkaIntegrationHeaders.FLUSHkafka_flushtruefalse
默认值已从 10 秒更改为 Kafka 生产者属性,以便将超时后的实际 Kafka 错误传播到应用程序,而不是此框架生成的超时。 为了保持一致性,这已经进行了更改,因为您可能会遇到意外的行为(Spring 可能会使发送超时,而实际上它最终会成功)。 重要说明:默认情况下,该超时为 120 秒,因此您可能希望减少它以获得更及时的故障。KafkaProducerMessageHandler.sendTimeoutExpressiondelivery.timeout.ms+ 5000
配置
以下示例演示如何为 Apache Kafka 配置出站通道适配器:
@Beanpublic ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));}@Beanpublic IntegrationFlow sendToKafkaFlow() { return f -> f .<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null) .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC1) .timestampExpression("T(Long).valueOf('1487694048633')"), e -> e.id("kafkaProducer1"))) .subscribe(sf -> sf.handle( kafkaMessageHandler(producerFactory(), TEST_TOPIC2) .timestamp(m -> 1487694048644L), e -> e.id("kafkaProducer2"))) );}@Beanpublic DefaultKafkaHeaderMapper mapper() { return new DefaultKafkaHeaderMapper();}private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler( ProducerFactory<Integer, String> producerFactory, String topic) { return Kafka .outboundChannelAdapter(producerFactory) .messageKey(m -> m .getHeaders() .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)) .headerMapper(mapper()) .partitionId(m -> 10) .topicExpression("headers[kafka_topic] ?: '" + topic + "'") .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));}消息驱动的通道适配器
() 使用 或 。KafkaMessageDrivenChannelAdapter<int-kafka:message-driven-channel-adapter>spring-kafkaKafkaMessageListenerContainerConcurrentListenerContainer
此外,该属性可用。 它可以接受 或 的值(默认值:)。 对于模式,每个消息负载都从单个 . 对于 mode,有效负载是从使用者轮询返回的所有实例转换的对象列表。 与批处理一样,、、 和标头也是列表,其位置对应于有效负载中的位置。moderecordbatchrecordrecordConsumerRecordbatchConsumerRecord@KafkaListenerKafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSET
收到的消息填充了某些标头。 有关更多信息,请参阅 KafkaHeaders 类。
对象(在标头中)不是线程安全的。 只能在适配器内调用侦听器的线程上调用其方法。 如果将消息传递给另一个线程,则不得调用其方法。Consumerkafka_consumer
提供 时,将根据其重试策略重试传递失败。 如果还提供了 ,则在重试次数用尽后,将使用默认值作为恢复回调。 还可以使用 指定在这种情况下要执行的其他操作,或将其设置为将最终异常抛出到侦听器容器,以便在那里处理它。retry-templateerror-channelErrorMessageSendingRecovererrecovery-callbacknull
生成 (用于 或 ) 时,可以通过设置属性来自定义错误消息。 默认情况下,使用 a 来提供对转换后的消息以及原始 .ErrorMessageerror-channelrecovery-callbackerror-message-strategyRawRecordHeaderErrorMessageStrategyConsumerRecord
这种形式的重试是阻塞的,如果所有轮询记录的总重试延迟可能超过使用者属性,则可能导致重新平衡。 相反,请考虑向侦听器容器添加 ,并配置 .{@code `max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer
配置
以下示例演示如何配置消息驱动的通道适配器:
@Beanpublic IntegrationFlow topic1ListenerFromKafkaFlow() { return IntegrationFlow .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1) .configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL) .id("topic1ListenerContainer")) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(), new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true)) .filter(Message.class, m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101, f -> f.throwExceptionOnRejection(true)) .<String, String>transform(String::toUpperCase) .channel(c -> c.queue("listeningFromKafkaResults1")) .get();}您还可以使用用于注释的容器工厂来创建用于其他目的的实例。 有关示例,请参阅 Spring for Apache Kafka 文档。@KafkaListenerConcurrentMessageListenerContainer
使用 Java DSL,容器不必配置为 ,因为 DSL 将容器注册为 Bean。 以下示例演示如何执行此操作:@Bean
@Beanpublic IntegrationFlow topic2ListenerFromKafkaFlow() { return IntegrationFlow .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), KafkaMessageDrivenChannelAdapter.ListenerMode.record) .id("topic2Adapter")) ... get();}请注意,在本例中,适配器被赋予一个 ()。 容器在应用程序上下文中注册,名称为 . 如果适配器没有属性,则容器的 Bean 名称是容器的完全限定类名加上 ,其中每个容器递增。idtopic2Adaptertopic2Adapter.containerid#nn
入站通道适配器
提供可轮询的通道适配器实现。KafkaMessageSource
配置
@Beanpublic IntegrationFlow flow(ConsumerFactory<String, String> cf) { return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, "myTopic") .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000))) .handle(System.out::println) .get();}有关可用属性,请参阅 javadocs。
默认情况下,必须在使用者工厂中显式设置,否则如果使用者工厂是 . 可以将属性设置为 以替代此行为。max.poll.recordsDefaultKafkaConsumerFactoryallowMultiFetchtrue
您必须在内部轮询使用者以避免重新平衡。 如果设置为 ,则必须在 中处理所有检索到的记录,然后再次轮询。max.poll.interval.msallowMultiFetchtruemax.poll.interval.ms
此适配器发出的消息包含一个标头,其中包含上一次轮询中剩余的记录计数。kafka_remainingRecords
出站网关
出站网关用于请求/回复操作。 它与大多数 Spring 集成网关的不同之处在于,发送线程不会在网关中阻塞,并且回复在回复侦听器容器线程上处理。 如果代码调用同步消息网关后面的网关,那么用户线程将在那里阻塞,直到收到回复(或发生超时)。
在为回复容器分配其主题和分区之前,网关不接受请求。 建议在模板的回复容器属性中添加 a,并在将消息发送到网关之前等待调用。ConsumerRebalanceListeneronPartitionsAssigned
默认值为 Kafka 生产者属性,以便将超时后的实际 Kafka 错误传播到应用程序,而不是此框架生成的超时。 为了保持一致性,这已经发生了变化,因为您可能会遇到意外的行为(Spring 可能会使 超时,而实际上它最终会成功)。 重要说明:默认情况下,该超时为 120 秒,因此您可能希望减少它以获得更及时的故障。KafkaProducerMessageHandlersendTimeoutExpressiondelivery.timeout.ms+ 5000send()
配置
以下示例演示如何配置网关:
@Beanpublic IntegrationFlow outboundGateFlow( ReplyingKafkaTemplate<String, String, String> kafkaTemplate) { return IntegrationFlow.from("kafkaRequests") .handle(Kafka.outboundGateway(kafkaTemplate)) .channel("kafkaReplies") .get();}有关可用属性,请参阅 javadocs。
请注意,使用的是与出站通道适配器相同的类,唯一的区别是传入构造函数的类是 . 有关更多信息,请参阅 Spring for Apache Kafka 文档。KafkaTemplateReplyingKafkaTemplate
出站主题、分区、键等的确定方式与出站适配器相同。 回复主题确定如下:
还可以指定标头以确定要用于答复的特定分区。 同样,这是根据模板的回复容器的订阅进行验证的。KafkaHeaders.REPLY_PARTITION
或者,您也可以使用类似于以下 Bean 的配置:
@Beanpublic IntegrationFlow outboundGateFlow() { return IntegrationFlow.from("kafkaRequests") .handle(Kafka.outboundGateway(producerFactory(), replyContainer()) .configureKafkaTemplate(t -> t.replyTimeout(30_000))) .channel("kafkaReplies") .get();}入站网关
入站网关用于请求/回复操作。
配置
以下示例演示如何配置入站网关:
@Beanpublic IntegrationFlow serverGateway( ConcurrentMessageListenerContainer<Integer, String> container, KafkaTemplate<Integer, String> replyTemplate) { return IntegrationFlow .from(Kafka.inboundGateway(container, replyTemplate) .replyTimeout(30_000)) .<String, String>transform(String::toUpperCase) .get();}有关可用属性,请参阅 javadocs。
提供 时,将根据其重试策略重试传递失败。 如果还提供了 ,则在重试次数用尽后,将使用默认值作为恢复回调。 还可以使用 指定在这种情况下要执行的其他操作,或将其设置为将最终异常抛出到侦听器容器,以便在那里处理它。RetryTemplateerror-channelErrorMessageSendingRecovererrecovery-callbacknull
生成 (用于 或 ) 时,可以通过设置属性来自定义错误消息。 默认情况下,使用 a 来提供对转换后的消息以及原始 .ErrorMessageerror-channelrecovery-callbackerror-message-strategyRawRecordHeaderErrorMessageStrategyConsumerRecord
这种形式的重试是阻塞的,如果所有轮询记录的总重试延迟可能超过使用者属性,则可能导致重新平衡。 相反,请考虑向侦听器容器添加 ,并配置 .{@code `max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer
以下示例演示如何使用 Java DSL 配置简单的大写转换器:
或者,可以使用类似于以下内容的代码配置大写转换器:
@Beanpublic IntegrationFlow serverGateway() { return IntegrationFlow .from(Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory()) .replyTimeout(30_000)) .<String, String>transform(String::toUpperCase) .get();}您还可以使用用于注释的容器工厂来创建用于其他目的的实例。 有关示例,请参阅 Spring for Apache Kafka 文档和消息驱动的通道适配器。@KafkaListenerConcurrentMessageListenerContainer
由 Apache Kafka 主题支持的频道
Spring Integration 具有由 Apache Kafka 主题支持的持久性实现。MessageChannel
每个通道都需要 a 用于发送端,以及侦听器容器工厂(用于可订阅通道)或 a 用于可轮询通道。KafkaTemplateKafkaMessageSource
Java DSL 配置
@Beanpublic IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) { return IntegrationFlow.from(...) ... .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1")) ... .get();}@Beanpublic IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) { return IntegrationFlow.from(...) ... .publishSubscribeChannel(pubSub(template, containerFactory), pubsub -> pubsub .subscribe(subflow -> ...) .subscribe(subflow -> ...)) .get();}@Beanpublic BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) { return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2") .groupId("group2") .get();}@Beanpublic IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template, KafkaMessageSource<Integer, String> source) { return IntegrationFlow.from(...) ... .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3")) .handle(..., e -> e.poller(...)) ... .get();}消息转换
提供了 A。 有关更多信息,请参阅 Spring for Apache Kafka 文档。StringJsonMessageConverter
将此转换器与消息驱动的通道适配器一起使用时,可以指定要将传入有效负载转换为的类型。 这是通过在适配器上设置属性(属性)来实现的。 下面的示例演示如何在 XML 配置中执行此操作:payload-typepayloadType
<int-kafka:message-driven-channel-adapter id="kafkaListener" listener-container="container1" auto-startup="false" phase="100" send-timeout="5000" channel="nullChannel" message-converter="messageConverter" payload-type="com.example.Thing" error-channel="errorChannel" /><bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>以下示例显示如何在 Java 配置中设置适配器的属性( 属性):payload-typepayloadType
@Beanpublic KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) { KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); kafkaMessageDrivenChannelAdapter.setMessageConverter(converter()); kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class); return kafkaMessageDrivenChannelAdapter;}空有效负载和日志压缩“逻辑删除”记录
Spring 消息传递对象不能有有效负载。 当您使用 Apache Kafka 的端点时,有效负载(也称为逻辑删除记录)由 类型的有效负载表示。 有关更多信息,请参阅 Spring for Apache Kafka 文档。Message<?>nullnullKafkaNull
Spring 集成端点的 POJO 方法可以使用 true 值而不是 。 为此,请使用 标记参数。 以下示例演示如何执行此操作:nullKafkaNull@Payload(required = false)
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key, @Payload(required = false) Customer customer) { // customer is null if a tombstone record ...}从KStream
您可以使用 从 调用集成流:MessagingTransformerKStream
@Beanpublic KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder, MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) { KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1); stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase) ... .transform(() -> transformer) .to(streamingTopic2); stream.print(Printed.toSysOut()); return stream;}@Bean@DependsOn("flow")public MessagingTransformer<byte[], byte[], String> transformer( MessagingFunction function) { MessagingMessageConverter converter = new MessagingMessageConverter(); converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); return new MessagingTransformer<>(function, converter);}@Beanpublic IntegrationFlow flow() { return IntegrationFlow.from(MessagingFunction.class) ... .get();}当集成流以接口开头时,创建的代理具有流 Bean 的名称,并附加“.gateway”,以便如果需要,可以将此 Bean 名称用作 a。@Qualifier
读/处理/写入方案的性能注意事项
许多应用程序从一个主题使用,执行一些处理并写入另一个主题。 在大多数情况下,如果失败,应用程序将希望引发异常,以便可以重试传入请求和/或发送到死信主题。 基础消息侦听器容器以及适当配置的错误处理程序支持此功能。 但是,为了支持这一点,我们需要阻止侦听器线程,直到写入操作成功(或失败),以便可以将任何异常抛到容器中。 使用单个记录时,这是通过在出站适配器上设置属性来实现的。 但是,在使用批处理时,使用 会导致性能显著下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。 您还可以执行多个发送,然后等待这些发送的结果。 这是通过向消息处理程序添加 来实现的。 要启用该功能,请添加到出站消息中;然后,这可用于将 与特定已发送消息相关联。 下面是如何使用此功能的示例:writesyncsyncfuturesChannelKafkaIntegrationHeaders.FUTURE_TOKENFuture
@SpringBootApplicationpublic class FuturesChannelApplication { public static void main(String[] args) { SpringApplication.run(FuturesChannelApplication.class, args); } @Bean IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) { return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.batch, "inTopic")) .handle(handler) .get(); } @Bean IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) { return IntegrationFlow.from(Gate.class) .enrichHeaders(h -> h .header(KafkaHeaders.TOPIC, "outTopic") .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]")) .handle(Kafka.outboundChannelAdapter(kafkaTemplate) .futuresChannel("futures")) .get(); } @Bean PollableChannel futures() { return new QueueChannel(); }}@Component@DependsOn("outbound")class Handler { @Autowired Gate gate; @Autowired PollableChannel futures; public void handle(List<String> input) throws Exception { System.out.println(input); input.forEach(str -> this.gate.send(str.toUpperCase())); for (int i = 0; i < input.size(); i++) { Message<?> future = this.futures.receive(10000); ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS); } }}interface Gate { void send(String out);}