记录序列化与反序列化
Kafka Streams binder 允许你以两种方式对记录进行序列化和反序列化。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换能力。让我们来看一些细节。
入站反序列化
键始终使用原生 Serdes 进行反序列化。
对于值(values)的反序列化,默认情况下,入站时的反序列化是由 Kafka 原生执行的。请注意,这是 Kafka Streams binder 默认行为的一个重大变化,在之前的版本中,反序列化是由框架完成的。
Kafka Streams binder 会通过查看 java.util.function.Function|Consumer
的类型签名来推断匹配的 Serde
类型。以下是它匹配 Serdes 的顺序。
- 如果应用程序提供了一个类型为
Serde
的 bean,并且返回类型是用传入键或值类型的实际类型参数化的,那么它将使用该Serde
进行入站反序列化。例如,如果应用程序中有以下内容,绑定器会检测到KStream
的传入值类型与一个在Serde
bean 上参数化的类型匹配。它将使用该Serde
进行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它会查看类型,并判断它们是否是 Kafka Streams 暴露的类型之一。如果是,则使用这些类型。以下是绑定器会尝试从 Kafka Streams 中匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID 和 String。
-
如果 Kafka Streams 提供的 Serdes 都不匹配这些类型,那么它将使用 Spring Kafka 提供的
JsonSerde
。在这种情况下,绑定器假设这些类型是 JSON 友好的。如果你有多个值对象作为输入,这将非常有用,因为绑定器会在内部将它们推断为正确的 Java 类型。不过在回退到JsonSerde
之前,绑定器会检查 Kafka Streams 配置中设置的默认Serde
,以查看它是否可以与传入的 KStream 类型匹配。
如果以上策略都无法奏效,那么应用程序必须通过配置提供 Serde
。这可以通过两种方式进行配置——绑定或默认。
首先,绑定器会检查在绑定级别是否提供了 Serde
。例如,如果你有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,你可以使用以下方式提供一个绑定级别的 Serde
:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果你为每个输入绑定提供 Serde
,如上所述,那么它将具有更高的优先级,绑定器将不会进行任何 Serde
推断。
如果你希望使用默认的键/值 Serdes 进行入站反序列化,你可以在 binder 级别进行配置。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不希望使用 Kafka 提供的原生解码功能,可以依赖 Spring Cloud Stream 提供的消息转换特性。由于原生解码是默认启用的,为了让 Spring Cloud Stream 反序列化入站的值对象,你需要显式地禁用原生解码。
例如,如果你有与上述相同的 BiFunction
处理器,那么你需要为每个输入单独禁用原生解码,如 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
。否则,那些你没有禁用的输入仍将应用原生解码。
默认情况下,Spring Cloud Stream 会使用 application/json
作为内容类型,并使用适当的 JSON 消息转换器。你可以通过使用以下属性和适当的 MessageConverter
bean 来使用自定义的消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化基本上遵循与入站反序列化相同的规则。与入站反序列化一样,与之前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 原生处理。在绑定器的 3.0 版本之前,这是由框架本身完成的。
在出站时,Kafka 总是使用由绑定器推断出的匹配的 Serde
来序列化键。如果无法推断出键的类型,则需要通过配置来指定。
值的序列化与反序列化(serdes)采用与入站反序列化相同的规则进行推断。首先,它会检查出站类型是否来自应用程序中提供的 bean。如果不是,它会检查是否与 Kafka 暴露的 Serde
匹配,例如 Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID
和 String
。如果这些都不匹配,则会回退到 Spring Kafka 项目提供的 JsonSerde
,但在此之前会先查看默认的 Serde
配置,看是否有匹配项。需要注意的是,所有这些操作对应用程序是透明的。如果以上方法都不奏效,则用户需要通过配置提供要使用的 Serde
。
假设你正在使用与上述相同的 BiFunction
处理器。那么你可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且没有提供绑定级别的 Serdes,那么绑定器将回退到 JsonSerde
,但会查看默认的 Serdes 以寻找匹配项。
默认的序列化/反序列化器(serdes)按照上述反序列化部分描述的方式进行配置。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你的应用程序使用了分支功能并且有多个输出绑定,那么这些绑定需要为每个绑定进行配置。再次强调,如果绑定器能够推断出 Serde
类型,你就不需要做这个配置。
如果你不希望使用 Kafka 提供的原生编码,而是希望使用框架提供的消息转换功能,那么你需要显式禁用原生编码,因为原生编码是默认启用的。例如,如果你有一个与上述相同的 BiFunction
处理器,那么你需要设置 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
。在分支的情况下,你需要为每个输出单独禁用原生编码。否则,对于那些你没有禁用的输出,原生编码仍然会被应用。
当 Spring Cloud Stream 进行转换时,默认情况下,它将使用 application/json
作为内容类型,并使用适当的 JSON 消息转换器。你可以通过使用以下属性和相应的 MessageConverter
bean 来自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,binder 将不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时,保持默认的序列化/反序列化选项,并坚持使用 Kafka Streams 提供的原生序列化/反序列化机制。唯一必须使用框架提供的消息转换能力的场景是当你的上游生产者使用特定的序列化策略时。在这种情况下,你希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认的 Serde
机制时,应用程序必须确保 binder 能够正确地将入站和出站映射到适当的 Serde
,否则可能会导致失败。
值得一提的是,上述数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。你的业务逻辑可能仍然需要调用 Kafka Streams API,这些 API 明确需要 Serde
对象。这些仍然是应用程序的责任,必须由开发者相应地处理。