跳到主要内容

手动启动 Kafka Streams 处理器

DeepSeek V3 中英对照 Manually starting Kafka Streams processors

Spring Cloud Stream Kafka Streams 绑定器在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一个名为 StreamsBuilderFactoryManager 的抽象。这个管理器 API 用于在基于绑定器的应用程序中控制每个处理器的多个 StreamsBuilderFactoryBean。因此,当使用绑定器时,如果你想手动控制应用程序中各个 StreamsBuilderFactoryBean 对象的自动启动,你需要使用 StreamsBuilderFactoryManager。你可以使用属性 spring.kafka.streams.auto-startup 并将其设置为 false 来关闭处理器的自动启动。然后,在应用程序中,你可以使用如下代码通过 StreamsBuilderFactoryManager 来启动处理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
none

当你希望应用程序在主线程中启动,并让 Kafka Streams 处理器单独启动时,这个功能非常方便。例如,当你有一个需要恢复的大型状态存储时,如果处理器像默认情况那样正常启动,这可能会阻塞应用程序的启动。如果你使用了某种活跃性探测机制(例如在 Kubernetes 上),它可能会认为应用程序已停止并尝试重启。为了解决这个问题,你可以将 spring.kafka.streams.auto-startup 设置为 false,并按照上述方法操作。

请记住,在使用 Spring Cloud Stream binder 时,您并不是直接处理来自 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是处理 StreamsBuilderFactoryManager,因为 StreamsBuilderFactoryBean 对象是由 binder 内部管理的。