混合使用高级 DSL 和低级 Processor API
Kafka Streams 提供了两种 API 变体。它提供了一个更高层次的 DSL(领域特定语言)API,允许你链式调用各种操作,这对于许多函数式程序员来说可能非常熟悉。Kafka Streams 还提供了一个低层次的 Processor API。尽管 Processor API 非常强大,并且能够在更低的层次上控制事物,但它的本质是命令式的。Spring Cloud Stream 的 Kafka Streams 绑定器允许你使用高级 DSL,或者混合使用 DSL 和 Processor API。混合使用这两种变体为你提供了许多选项,以控制应用程序中的各种用例。应用程序可以使用 transform
或 process
方法 API 调用来访问 Processor API。
以下是如何在 Spring Cloud Stream 应用程序中使用 process
API 将 DSL 和处理器 API 结合在一起的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这是一个使用 transform
API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法调用是一个终端操作,而 transform
API 是非终端的,它会返回一个可能经过转换的 KStream
,你可以使用 DSL 或处理器 API 继续进行进一步处理。