跳到主要内容

混合使用高级 DSL 和低级 Processor API

DeepSeek V3 中英对照 Mixing high level DSL and low level Processor API

Kafka Streams 提供了两种 API 变体。它提供了一个更高层次的 DSL(领域特定语言)API,允许你链式调用各种操作,这对于许多函数式程序员来说可能非常熟悉。Kafka Streams 还提供了一个低层次的 Processor API。尽管 Processor API 非常强大,并且能够在更低的层次上控制事物,但它的本质是命令式的。Spring Cloud Stream 的 Kafka Streams 绑定器允许你使用高级 DSL,或者混合使用 DSL 和 Processor API。混合使用这两种变体为你提供了许多选项,以控制应用程序中的各种用例。应用程序可以使用 transformprocess 方法 API 调用来访问 Processor API。

以下是如何在 Spring Cloud Stream 应用程序中使用 process API 将 DSL 和处理器 API 结合在一起的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}

@Override
public void process(Object key, String value) {
//business logic
}

@Override
public void close() {

});
}
none

这是一个使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {

}

@Override
public void close() {

}

@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
none

process API 方法调用是一个终端操作,而 transform API 是非终端的,它会返回一个可能经过转换的 KStream,你可以使用 DSL 或处理器 API 继续进行进一步处理。