跳到主要内容

编程模型的辅助工具

DeepSeek V3 中英对照 Ancillaries to the programming model

单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。你可以拥有如下所示的应用程序。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
none

在这种情况下,binder 将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(下面会详细说明)。但是,如果应用程序中有多个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些函数。以下是激活函数的方式。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果你希望某些功能不要立即激活,可以从这个列表中移除它们。

当你在同一个应用程序中有一个 Kafka Streams 处理器和其他类型的 Function bean 时,这也是成立的,这些 bean 通过不同的 binder 进行处理(例如,基于常规 Kafka 消息通道 binder 的函数 bean)。

Kafka Streams 应用程序 ID

应用程序 ID 是 Kafka Streams 应用程序必须提供的属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。

如果应用程序中只有一个处理器,那么你可以使用以下属性在绑定器级别进行设置:

spring.cloud.stream.kafka.streams.binder.applicationId

为了方便起见,如果您只有一个处理器,您也可以使用 spring.application.name 作为委托应用程序 ID 的属性。

如果应用程序中有多个 Kafka Streams 处理器,那么你需要为每个处理器设置 application id。在函数式模型中,你可以将其作为属性附加到每个函数上。

例如,假设你有以下函数。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
none

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
none

然后你可以使用以下 binder 级别的属性为每个应用程序设置 application id。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法同样适用。然而,如果您使用的是函数模型,如上所述,在绑定器级别为每个函数进行设置会更为简便。

对于生产环境的部署,强烈建议通过配置显式指定应用程序 ID。如果您正在对应用程序进行自动扩展,这一点尤其重要,因为在这种情况下,您需要确保每个实例都使用相同的应用程序 ID 进行部署。

如果应用程序没有提供应用程序 ID,那么在这种情况下,绑定器会自动为您生成一个静态的应用程序 ID。这在开发场景中非常方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启时将保持不变。在功能模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟字面量 applicationID,例如,如果函数 bean 名称为 process,则生成的应用程序 ID 为 process-applicationID

设置 Application ID 的总结

  • 默认情况下,binder 会为每个函数方法自动生成应用 ID。

  • 如果你只有一个处理器,那么你可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果你有多个处理器,那么可以使用属性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 为每个函数设置应用 ID。

使用函数式风格覆盖绑定器生成的默认绑定名称

默认情况下,当使用函数式风格时,绑定器会使用上述讨论的策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0process-out-0 等。如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。

spring.cloud.stream.function.bindings.<默认绑定名称>。默认绑定名称是由绑定器生成的原始绑定名称。

例如,假设你有这个函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
none

Binder 将会生成名称为 process-in-0process-in-1process-out-0 的绑定。现在,如果你想将它们更改为完全不同的名称,可能是更具领域特定性的绑定名称,那么你可以按照以下方式操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,您必须为这些新的绑定名称设置所有绑定级别的属性。

请记住,在上述函数式编程模型中,在大多数情况下遵循默认的绑定名称是有意义的。唯一可能仍然需要覆盖默认绑定的情况是,当你有大量的配置属性,并且希望将绑定映射到更符合领域友好的名称时。

设置引导服务器配置

在运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。如果您没有提供此信息,绑定器会期望您在默认的 localhost:9092 上运行代理。如果不是这种情况,那么您需要覆盖该设置。有几种方法可以做到这一点。

  • 使用 boot 属性 - spring.kafka.bootstrapServers

  • Binder 级别属性 - spring.cloud.stream.kafka.streams.binder.brokers

在讨论 binder 级别的属性时,无论你是使用通过常规 Kafka binder 提供的 broker 属性 spring.cloud.stream.kafka.binder.brokers,还是使用 Kafka Streams binder 特定的 broker 属性 spring.cloud.stream.kafka.streams.binder.brokers,都不会影响结果。Kafka Streams binder 会首先检查是否设置了 Kafka Streams binder 特定的 broker 属性(spring.cloud.stream.kafka.streams.binder.brokers),如果没有找到,则会查找 spring.cloud.stream.kafka.binder.brokers