跳到主要内容

状态存储

DeepSeek V3 中英对照 State Store

当使用 Kafka Streams 的高级 DSL 并调用适当的操作时,Kafka Streams 会自动创建状态存储(state store),这些操作会触发状态存储的创建。

如果你想将传入的 KTable 绑定物化为一个命名的状态存储(state store),那么你可以使用以下策略来实现。

假设你有以下函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
none

然后通过设置以下属性,传入的 KTable 数据将被物化到命名的状态存储中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
none

你可以在应用程序中将自定义状态存储定义为 bean,这些 bean 会被绑定器检测并添加到 Kafka Streams 构建器中。特别是在使用处理器 API 时,你需要手动注册一个状态存储。为此,你可以在应用程序中将 StateStore 创建为一个 bean。以下是定义此类 bean 的示例。

@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
none

这些状态存储可以被应用程序直接访问。

在引导过程中,上述的 beans 将由 binder 处理并传递给 Streams 构建器对象。

访问状态存储:

Processor<Object, Product>() {

WindowStore<Object, String> state;

@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
none

在注册全局状态存储时,这种方法将不起作用。要注册全局状态存储,请参阅下面关于自定义 StreamsBuilderFactoryBean 的部分。