跳到主要内容

基于事件类型的 Kafka Streams 应用程序路由

DeepSeek V3 中英对照 Event type based routing in Kafka Streams applications

在基于常规消息通道的绑定器中可用的路由功能在 Kafka Streams 绑定器中不受支持。然而,Kafka Streams 绑定器仍然通过入站记录上的事件类型记录头提供路由功能。

要启用基于事件类型的路由,应用程序必须提供以下属性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

这可以是一个逗号分隔的值。

例如,假设我们有这个函数:

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
none

我们还假设,只有当传入的记录具有 foobar 事件类型时,才希望执行此函数中的业务逻辑。可以使用绑定上的 eventTypes 属性来表达这一点,如下所示。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,绑定器会检查每个传入记录的 event_type 头,并查看其值是否设置为 foobar。如果未找到其中任何一个值,则函数执行将被跳过。

默认情况下,binder 期望记录头键为 event_type,但可以根据每个绑定进行更改。例如,如果我们想将此绑定的头键更改为 my_event 而不是默认值,可以按如下方式进行更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event

在使用 Kafka Streams binder 的事件路由功能时,它使用字节数组 Serde 来反序列化所有传入的记录。如果记录头与事件类型匹配,则仅使用实际的 Serde 通过配置或推断的 Serde 进行正确的反序列化。如果在绑定上设置了反序列化异常处理程序,这会导致问题,因为预期的反序列化仅在堆栈下方发生,从而导致意外的错误。为了解决这个问题,你可以在绑定上设置以下属性,以强制 binder 使用配置或推断的 Serde,而不是字节数组 Serde

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

这样,应用程序在使用事件路由功能时,可以立即检测到反序列化问题,并采取适当的处理决策。