编程模型的辅助工具
单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。你可以拥有如下所示的应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,binder 将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(下面会详细说明)。但是,如果应用程序中有多个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些函数。以下是激活函数的方式。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果你希望某些功能不要立即激活,可以从这个列表中移除它们。
当你在同一个应用程序中有一个 Kafka Streams 处理器和其他类型的 Function
bean 时,这也是成立的,这些 bean 通过不同的 binder 进行处理(例如,基于常规 Kafka 消息通道 binder 的函数 bean)。
Kafka Streams 应用程序 ID
应用程序 ID 是 Kafka Streams 应用程序必须提供的属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,那么你可以使用以下属性在绑定器级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId
。
为了方便起见,如果您只有一个处理器,您也可以使用 spring.application.name
作为委托应用程序 ID 的属性。
如果应用程序中有多个 Kafka Streams 处理器,那么你需要为每个处理器设置 application id
。在函数式模型中,你可以将其作为属性附加到每个函数上。
例如,假设你有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后你可以使用以下 binder 级别的属性为每个应用程序设置 application id。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法同样适用。然而,如果您使用的是函数模型,如上所述,在绑定器级别为每个函数进行设置会更为简便。
对于生产环境的部署,强烈建议通过配置显式指定应用程序 ID。如果您正在对应用程序进行自动扩展,这一点尤其重要,因为在这种情况下,您需要确保每个实例都使用相同的应用程序 ID 进行部署。
如果应用程序没有提供应用程序 ID,那么在这种情况下,绑定器会自动为您生成一个静态的应用程序 ID。这在开发场景中非常方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启时将保持不变。在功能模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟字面量 applicationID
,例如,如果函数 bean 名称为 process
,则生成的应用程序 ID 为 process-applicationID
。
设置 Application ID 的总结
-
默认情况下,binder 会为每个函数方法自动生成应用 ID。
-
如果你只有一个处理器,那么你可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果你有多个处理器,那么可以使用属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每个函数设置应用 ID。
使用函数式风格覆盖绑定器生成的默认绑定名称
默认情况下,当使用函数式风格时,绑定器会使用上述讨论的策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0
、process-out-0
等。如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<默认绑定名称>
。默认绑定名称是由绑定器生成的原始绑定名称。
例如,假设你有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将会生成名称为 process-in-0
、process-in-1
和 process-out-0
的绑定。现在,如果你想将它们更改为完全不同的名称,可能是更具领域特定性的绑定名称,那么你可以按照以下方式操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须为这些新的绑定名称设置所有绑定级别的属性。
请记住,在上述函数式编程模型中,在大多数情况下遵循默认的绑定名称是有意义的。唯一可能仍然需要覆盖默认绑定的情况是,当你有大量的配置属性,并且希望将绑定映射到更符合领域友好的名称时。
设置引导服务器配置
在运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。如果您没有提供此信息,绑定器会期望您在默认的 localhost:9092
上运行代理。如果不是这种情况,那么您需要覆盖该设置。有几种方法可以做到这一点。
-
使用
boot
属性 -spring.kafka.bootstrapServers
-
Binder 级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
在讨论 binder 级别的属性时,无论你是使用通过常规 Kafka binder 提供的 broker 属性 spring.cloud.stream.kafka.binder.brokers
,还是使用 Kafka Streams binder 特定的 broker 属性 spring.cloud.stream.kafka.streams.binder.brokers
,都不会影响结果。Kafka Streams binder 会首先检查是否设置了 Kafka Streams binder 特定的 broker 属性(spring.cloud.stream.kafka.streams.binder.brokers
),如果没有找到,则会查找 spring.cloud.stream.kafka.binder.brokers
。