跳到主要内容

记录序列化与反序列化

DeepSeek V3 中英对照 Record serialization and deserialization

Kafka Streams binder 允许你以两种方式对记录进行序列化和反序列化。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换能力。让我们来看一些细节。

入站反序列化

键始终使用原生 Serdes 进行反序列化。

对于值(values)的反序列化,默认情况下,入站时的反序列化是由 Kafka 原生执行的。请注意,这是 Kafka Streams binder 默认行为的一个重大变化,在之前的版本中,反序列化是由框架完成的。

Kafka Streams binder 会通过查看 java.util.function.Function|Consumer 的类型签名来推断匹配的 Serde 类型。以下是它匹配 Serdes 的顺序。

  • 如果应用程序提供了一个类型为 Serde 的 bean,并且返回类型是用传入键或值类型的实际类型参数化的,那么它将使用该 Serde 进行入站反序列化。例如,如果应用程序中有以下内容,绑定器会检测到 KStream 的传入值类型与一个在 Serde bean 上参数化的类型匹配。它将使用该 Serde 进行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
none
  • 接下来,它会查看类型,并判断它们是否是 Kafka Streams 暴露的类型之一。如果是,则使用这些类型。以下是绑定器会尝试从 Kafka Streams 中匹配的 Serde 类型。

    Integer, Long, Short, Double, Float, byte[], UUID 和 String。
  • 如果 Kafka Streams 提供的 Serdes 都不匹配这些类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设这些类型是 JSON 友好的。如果你有多个值对象作为输入,这将非常有用,因为绑定器会在内部将它们推断为正确的 Java 类型。不过在回退到 JsonSerde 之前,绑定器会检查 Kafka Streams 配置中设置的默认 Serde,以查看它是否可以与传入的 KStream 类型匹配。

如果以上策略都无法奏效,那么应用程序必须通过配置提供 Serde。这可以通过两种方式进行配置——绑定或默认。

首先,绑定器会检查在绑定级别是否提供了 Serde。例如,如果你有以下处理器,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
none

然后,你可以使用以下方式提供一个绑定级别的 Serde

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
none
备注

如果你为每个输入绑定提供 Serde,如上所述,那么它将具有更高的优先级,绑定器将不会进行任何 Serde 推断。

如果你希望使用默认的键/值 Serdes 进行入站反序列化,你可以在 binder 级别进行配置。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
none

如果你不希望使用 Kafka 提供的原生解码功能,可以依赖 Spring Cloud Stream 提供的消息转换特性。由于原生解码是默认启用的,为了让 Spring Cloud Stream 反序列化入站的值对象,你需要显式地禁用原生解码。

例如,如果你有与上述相同的 BiFunction 处理器,那么你需要为每个输入单独禁用原生解码,如 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false。否则,那些你没有禁用的输入仍将应用原生解码。

默认情况下,Spring Cloud Stream 会使用 application/json 作为内容类型,并使用适当的 JSON 消息转换器。你可以通过使用以下属性和适当的 MessageConverter bean 来使用自定义的消息转换器。

spring.cloud.stream.bindings.process-in-0.contentType
none

出站序列化

出站序列化基本上遵循与入站反序列化相同的规则。与入站反序列化一样,与之前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 原生处理。在绑定器的 3.0 版本之前,这是由框架本身完成的。

在出站时,Kafka 总是使用由绑定器推断出的匹配的 Serde 来序列化键。如果无法推断出键的类型,则需要通过配置来指定。

值的序列化与反序列化(serdes)采用与入站反序列化相同的规则进行推断。首先,它会检查出站类型是否来自应用程序中提供的 bean。如果不是,它会检查是否与 Kafka 暴露的 Serde 匹配,例如 IntegerLongShortDoubleFloatbyte[]UUIDString。如果这些都不匹配,则会回退到 Spring Kafka 项目提供的 JsonSerde,但在此之前会先查看默认的 Serde 配置,看是否有匹配项。需要注意的是,所有这些操作对应用程序是透明的。如果以上方法都不奏效,则用户需要通过配置提供要使用的 Serde

假设你正在使用与上述相同的 BiFunction 处理器。那么你可以按如下方式配置出站键/值 Serdes。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
none

如果 Serde 推断失败,并且没有提供绑定级别的 Serdes,那么绑定器将回退到 JsonSerde,但会查看默认的 Serdes 以寻找匹配项。

默认的序列化/反序列化器(serdes)按照上述反序列化部分描述的方式进行配置。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果你的应用程序使用了分支功能并且有多个输出绑定,那么这些绑定需要为每个绑定进行配置。再次强调,如果绑定器能够推断出 Serde 类型,你就不需要做这个配置。

如果你不希望使用 Kafka 提供的原生编码,而是希望使用框架提供的消息转换功能,那么你需要显式禁用原生编码,因为原生编码是默认启用的。例如,如果你有一个与上述相同的 BiFunction 处理器,那么你需要设置 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false。在分支的情况下,你需要为每个输出单独禁用原生编码。否则,对于那些你没有禁用的输出,原生编码仍然会被应用。

当 Spring Cloud Stream 进行转换时,默认情况下,它将使用 application/json 作为内容类型,并使用适当的 JSON 消息转换器。你可以通过使用以下属性和相应的 MessageConverter bean 来自定义消息转换器。

spring.cloud.stream.bindings.process-out-0.contentType
none

当禁用原生编码/解码时,binder 将不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时,保持默认的序列化/反序列化选项,并坚持使用 Kafka Streams 提供的原生序列化/反序列化机制。唯一必须使用框架提供的消息转换能力的场景是当你的上游生产者使用特定的序列化策略时。在这种情况下,你希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认的 Serde 机制时,应用程序必须确保 binder 能够正确地将入站和出站映射到适当的 Serde,否则可能会导致失败。

值得一提的是,上述数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。你的业务逻辑可能仍然需要调用 Kafka Streams API,这些 API 明确需要 Serde 对象。这些仍然是应用程序的责任,必须由开发者相应地处理。