跳到主要内容

编程模型

DeepSeek V3 中英对照 Programming Model

在使用 Kafka Streams binder 提供的编程模型时,可以选择使用高级的 Streams DSL,也可以混合使用高级和低级的 Processor-API。当混合使用高级和低级 API 时,通常是通过在 KStream 上调用 transformprocess API 方法来实现的。

函数式风格

从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中提供的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为 java.util.function.Functionjava.util.function.Consumer 类型的 lambda 表达式。

让我们来看一个非常基础的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {

return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
none

虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个消费者应用程序,没有出站绑定,只有一个入站绑定。应用程序消费数据,并简单地将来自 KStream 键和值的信息记录到标准输出中。应用程序包含 SpringBootApplication 注解和一个标记为 Bean 的方法。这个 bean 方法的类型是 java.util.function.Consumer,并用 KStream 进行参数化。然后在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。在 lambda 表达式中,提供了处理数据的代码。

在这个应用程序中,有一个类型为 KStream 的单一输入绑定。绑定器为应用程序创建了这个绑定,其名称为 process-in-0,即函数 bean 名称后跟一个连字符(-),然后是字面量 in,接着是另一个连字符和参数的序号位置。你可以使用这个绑定名称来设置其他属性,例如目的地。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

备注

如果绑定上未设置目标属性,则会创建一个与绑定同名的主题(如果应用程序具有足够的权限),或者该主题预计已经存在。

一旦构建为一个 uber-jar(例如 kstream-consumer-app.jar),你可以像下面这样运行上述示例。

如果应用程序选择使用 Spring 的 Component 注解来定义功能 bean,绑定器也支持这种模式。上述功能 bean 可以重写为如下形式。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
none
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
none

这是另一个示例,其中是一个完整的处理器,包含输入和输出绑定。这是一个经典的单词计数示例,应用程序从一个主题接收数据,然后在滚动时间窗口中计算每个单词的出现次数。

@SpringBootApplication
public class WordCountProcessorApplication {

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}

public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
none

这里同样是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,这里的 bean 方法类型为 java.util.function.FunctionFunction 的第一个泛型类型用于输入 KStream,第二个用于输出。在方法体中,提供了一个类型为 Function 的 lambda 表达式,并在其中实现了实际的业务逻辑。与之前讨论的基于 Consumer 的应用程序类似,这里的输入绑定默认命名为 process-in-0。对于输出,绑定名称也会自动设置为 process-out-0

一旦构建为 uber-jar(例如,wordcount-processor.jar),你可以像下面这样运行上面的示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
none

该应用程序将从 Kafka 主题 words 中消费消息,并将计算后的结果发布到输出主题 counts 中。

Spring Cloud Stream 将确保来自入站和出站主题的消息自动绑定为 KStream 对象。作为开发者,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。Kafka Streams 基础设施所需的 Kafka Streams 特定配置由框架自动处理。

我们上面看到的两个例子都有一个单一的 KStream 输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题复用到一个 KStream 绑定中,您可以在下面提供以逗号分隔的 Kafka 主题作为目的地。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您希望根据正则表达式匹配主题,也可以提供主题模式作为目的地。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多输入绑定

许多复杂的 Kafka Streams 应用程序通常通过多个绑定从一个以上的主题中消费数据。例如,一个主题作为 Kstream 消费,另一个作为 KTableGlobalKTable 消费。应用程序可能有多种原因希望以表类型接收数据。考虑一个用例,其中底层主题通过数据库的变更数据捕获(CDC)机制填充,或者应用程序只关心最新更新以进行下游处理。如果应用程序指定数据需要绑定为 KTableGlobalKTable,那么 Kafka Streams 绑定器将正确地将目标绑定到 KTableGlobalKTable,并使它们可供应用程序操作。我们将探讨 Kafka Streams 绑定器中处理多个输入绑定的几种不同场景。

Kafka Streams Binder 中的 BiFunction

这是一个示例,其中有两个输入和一个输出。在这种情况下,应用程序可以利用 java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
none

再次强调,基本主题与前几个示例相同,但这里我们有两个输入。Java 的 BiFunction 支持用于将输入绑定到期望的目标位置。绑定器为输入生成的默认绑定名称分别为 process-in-0process-in-1。默认的输出绑定是 process-out-0。在此示例中,BiFunction 的第一个参数被绑定为第一个输入的 KStream,第二个参数被绑定为第二个输入的 KTable

Kafka Streams Binder 中的 BiConsumer

如果有两个输入但没有输出,在这种情况下我们可以使用 java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
none

超越两个输入

如果你有两个以上的输入怎么办?在某些情况下,你可能需要两个以上的输入。在这种情况下,binder 允许你链式调用部分函数。在函数式编程术语中,这种技术通常称为柯里化(currying)。随着 Java 8 引入的函数式编程支持,Java 现在允许你编写柯里化函数。Spring Cloud Stream Kafka Streams binder 可以利用这一特性来实现多个输入绑定。

让我们来看一个例子。

@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
none

让我们看看上面介绍的绑定模型的细节。在这个模型中,我们有 3 个部分应用的函数在输入方向上。我们称它们为 f(x)f(y)f(z)。如果我们将这些函数展开为真正的数学函数,它们看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。变量 x 代表 KStream<Long, Order>,变量 y 代表 GlobalKTable<Long, Customer>,变量 z 代表 GlobalKTable<Long, Product>。第一个函数 f(x) 具有应用程序的第一个输入绑定(KStream<Long, Order>),其输出是函数 f(y)。函数 f(y) 具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),其输出是另一个函数 f(z)。函数 f(z) 的输入是应用程序的第三个输入(GlobalKTable<Long, Product>),其输出是 KStream<Long, EnrichedOrder>,这是应用程序的最终输出绑定。这三个部分函数的输入分别是 KStreamGlobalKTableGlobalKTable,它们可以在方法体中作为 lambda 表达式的一部分用于实现业务逻辑。

输入绑定分别命名为 enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为 enrichOrder-out-0

使用柯里化函数,你实际上可以有任意数量的输入。然而,请记住,如果输入数量过多,并且像上面在 Java 中那样对它们进行部分应用函数,可能会导致代码难以阅读。因此,如果你的 Kafka Streams 应用程序需要超过合理数量的输入绑定,并且你想使用这种函数式模型,那么你可能需要重新考虑你的设计,并适当地分解应用程序。

输出绑定

Kafka Streams binder 允许将 KStreamKTable 类型作为输出绑定。在幕后,binder 使用 KStreamto 方法将结果记录发送到输出主题。如果应用程序在函数中提供 KTable 作为输出,binder 仍然通过委托给 KStreamto 方法使用这种技术。

例如,以下两个函数都可以工作:

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
none

多输出绑定

Kafka Streams 允许将出站数据写入多个主题。这一特性在 Kafka Streams 中被称为分支(branching)。当使用多个输出绑定时,你需要提供一个 KStream 数组(KStream[])作为出站返回类型。

这是一个示例:

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();

return stringKStreamMap.values().toArray(new KStream[0]);
};
}
none

编程模型保持不变,但输出的参数化类型为 KStream[]。对于上述函数,默认的输出绑定名称分别为 process-out-0process-out-1process-out-2。绑定器生成三个输出绑定的原因是它检测到返回的 KStream 数组长度为三。请注意,在此示例中,我们提供了 noDefaultBranch();如果我们使用了 defaultBranch(),则需要一个额外的输出绑定,本质上将返回一个长度为四的 KStream 数组。

Kafka Streams 基于函数式编程风格的总结

总的来说,下表展示了在函数式编程范式中可以使用的各种选项。

输入数量输出数量使用的组件
10java.util.function.Consumer
20java.util.function.BiConsumer
11..njava.util.function.Function
21..njava.util.function.BiFunction
>= 30..n使用柯里化函数
  • 如果此表中有多个输出,类型将简单地变为 KStream[]

Kafka Streams 绑定器中的函数组合

Kafka Streams 绑定器支持线性拓扑的最小形式的功能组合。使用 Java 功能 API 支持,你可以编写多个函数,然后使用 andThen 方法自行组合它们。例如,假设你有以下两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
none

即使没有绑定器中的函数组合支持,你也可以像下面这样组合这两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
none

然后你可以提供形式为 spring.cloud.function.definition=foo;bar;composed 的定义。在 binder 的函数组合支持中,你不需要编写这个显式函数组合的第三个函数。

你可以简单地这样做:

spring.cloud.function.definition=foo|bar
none

你甚至可以这样做:

spring.cloud.function.definition=foo|bar;foo;bar
none

在这个例子中,组合函数的默认绑定名称变为 foobar-in-0foobar-out-0

Kafka Streams bincer 中函数式组合的局限性

当你有一个 java.util.function.Function bean 时,它可以与另一个函数或多个函数组合。同样的函数 bean 也可以与 java.util.function.Consumer 组合。在这种情况下,consumer 是组合的最后一个组件。一个函数可以与多个函数组合,然后以一个 java.util.function.Consumer bean 结束。

在组合类型为 java.util.function.BiFunction 的 bean 时,BiFunction 必须在定义中作为第一个函数。组合的实体必须是 java.util.function.Functionjava.util.function.Consumer 类型。换句话说,你不能将一个 BiFunction bean 与另一个 BiFunction 组合在一起。

你不能使用 BiConsumer 类型或 Consumer 作为第一个组件的定义进行组合。除非这是定义中的最后一个组件,否则你也不能与输出为数组(例如用于分支的 KStream[])的函数进行组合。

在函数定义中,BiFunction 的第一个 Function 也可以使用柯里化形式。例如,以下形式是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
none

函数定义可以是 curriedFoo|bar。在幕后,绑定器将为这个柯里化函数创建两个输入绑定,并根据定义中的最终函数创建一个输出绑定。在这种情况下,默认的输入绑定将是 curriedFoobar-in-0curriedFoobar-in-1。对于这个例子,默认的输出绑定变为 curriedFoobar-out-0

关于在函数组合中使用 KTable 作为输出的特别说明

假设你有以下两个函数。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
none

你可以将它们组合为 foo|bar,但请记住,第二个函数(在这种情况下是 bar)必须以 KTable 作为输入,因为第一个函数(foo)的输出是 KTable