跳到主要内容

手动选择性启动 Kafka Streams 处理器

DeepSeek V3 中英对照 Manually starting Kafka Streams processors selectively

虽然上述方法会通过 StreamsBuilderFactoryManager 无条件地将自动启动设置为 false 应用于应用程序中的所有 Kafka Streams 处理器,但通常更希望仅对个别选择的 Kafka Streams 处理器不自动启动。例如,假设您的应用程序中有三个不同的函数(处理器),而您不希望其中一个处理器作为应用程序启动的一部分启动。以下是这种情况的示例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}
none

在上述场景中,如果将 spring.kafka.streams.auto-startup 设置为 false,那么在应用程序启动期间,所有的处理器都不会自动启动。在这种情况下,你必须按照上述方式通过调用底层 StreamsBuilderFactoryManagerstart() 方法来以编程方式启动它们。然而,如果我们有一个用例需要选择性地仅禁用某一个处理器,那么你需要在为该处理器的单个绑定上设置 auto-startup。假设我们不希望 process3 函数自动启动。这是一个具有两个输入绑定的 BiFunction - process3-in-0process3-in-1。为了避免此处理器自动启动,你可以选择这些输入绑定中的任意一个并在其上设置 auto-startup。选择哪个绑定并不重要;如果你愿意,你可以在两者上都设置 auto-startupfalse,但其中一个就足够了。因为它们共享同一个工厂 bean,你不需要在两个绑定上都设置 autoStartupfalse,但为了清晰起见,这样做可能是有意义的。

以下是您可以用于禁用此处理器自动启动的 Spring Cloud Stream 属性。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
none

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
none

然后,你可以手动启动处理器,使用 REST 端点或使用如下所示的 BindingsEndpoint API。为此,你需要确保在类路径上有 Spring Boot actuator 依赖。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
none

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
none

有关此机制的更多详细信息,请参阅参考文档中的此部分

备注

当按照本节所述通过禁用 auto-startup 来控制绑定时,请注意这仅适用于消费者绑定。换句话说,如果您使用生产者绑定 process3-out-0,则在禁用处理器的自动启动方面没有任何效果,尽管此生产者绑定与消费者绑定使用相同的 StreamsBuilderFactoryBean