跳到主要内容

基于 Kafka Streams 绑定器和常规 Kafka 绑定器的多绑定器

DeepSeek V3 中英对照 Multi binders with Kafka Streams based binders and regular Kafka Binder

你可以有一个应用程序,其中同时包含基于常规 Kafka 绑定器的函数/消费者/供应商和基于 Kafka Streams 的处理器。但是,你不能在单个函数或消费者中混合使用它们。

这是一个示例,其中同一个应用程序中同时使用了基于 Binder 的组件。

@Bean
public Function<String, String> process() {
return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

return input -> input;
}
none

这是配置中的相关部分:

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
none

如果你有与上述相同的应用程序,但需要处理两个不同的 Kafka 集群,情况会变得更加复杂。例如,常规的 process 同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器则作用于 Kafka 集群 2。那么你必须使用 Spring Cloud Stream 提供的 多绑定器 功能。

在这种场景下,您的配置可能会发生以下变化。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
none

请注意上述配置。我们有两种类型的 binder,但总共有 3 个 binder,第一个是基于集群 1 (kafka1) 的常规 Kafka binder,第二个是基于集群 2 (kafka2) 的另一个 Kafka binder,最后一个是 kstream binder (kafka3)。应用程序中的第一个处理器从 kafka1 接收数据并发布到 kafka2,这两个 binder 都是基于常规 Kafka binder,但属于不同的集群。第二个处理器是一个 Kafka Streams 处理器,它从 kafka3 消费数据,kafka3kafka2 是同一个集群,但 binder 类型不同。

由于 Kafka Streams 系列绑定器中有三种不同的绑定类型可用 - kstreamktableglobalktable - 如果你的应用程序基于其中任何一种绑定器有多个绑定,则需要显式提供绑定类型。

例如,如果你有一个如下的处理器,

@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

...
}
none

然后,这需要在多绑定器场景中进行如下配置。请注意,只有在真正的多绑定器场景中才需要这样做,即在一个应用程序中有多个处理器处理多个集群。在这种情况下,需要显式地为绑定器提供绑定,以区分其他处理器的绑定器类型和集群。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.
none