跳到主要内容

使用 Reactive Kafka Binder 的基础示例

DeepSeek V3 中英对照 Basic Example using the Reactive Kafka Binder

在本节中,我们将展示一些基本的代码片段,用于使用响应式绑定器编写响应式 Kafka 应用程序,并详细介绍相关内容。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
java

你可以将上述的 uppercase 函数与基于消息通道的 Kafka binder(spring-cloud-stream-binder-kafka)以及本节讨论的响应式 Kafka binder(spring-cloud-stream-binder-kafka-reactive)一起使用。当你在常规的 Kafka binder 中使用这个函数时,尽管你在应用程序中使用了响应式类型(即在 uppercase 函数中),你只能在函数执行期间获得响应式流。在函数的执行上下文之外,由于底层的 binder 不是基于响应式堆栈的,因此没有响应式的好处。因此,尽管这看起来像是带来了一个完整的端到端响应式堆栈,但这个应用程序只是部分响应式的。

现在假设你在上述函数的应用中使用的是 Kafka 的响应式绑定器——spring-cloud-stream-binder-kafka-reactive。这个绑定器实现将从顶端的消费到底端的发布,全程提供完整的响应式优势。这是因为底层的绑定器是构建在 Reactor Kafka 的核心 API 之上的。在消费者端,它使用了 KafkaReceiver,这是 Kafka 消费者的响应式实现。同样,在生产者端,它使用了 KafkaSender API,这是 Kafka 生产者的响应式实现。由于响应式 Kafka 绑定器的基础是建立在正确的响应式 Kafka API 之上的,应用程序可以充分利用响应式技术的优势。使用这个响应式 Kafka 绑定器时,诸如自动背压等响应式能力都是内置的。

从 4.0.2 版本开始,你可以通过提供一个或多个 ReceiverOptionsCustomizerSenderOptionsCustomizer 的 bean 来自定义 ReceiverOptionsSenderOptions。它们是 BiFunction,接收绑定的名称和初始选项,并返回自定义后的选项。这些接口扩展了 Ordered,因此当存在多个自定义器时,它们将按照所需的顺序应用。

important

默认情况下,binder 不会提交偏移量。从 4.0.2 版本开始,KafkaHeaders.ACKNOWLEDGMENT 头信息包含一个 ReceiverOffset 对象,允许你通过调用其 acknowledge()commit() 方法来提交偏移量。

@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
java

有关更多信息,请参考 reactor-kafka 文档和 javadocs。

此外,从 4.0.3 版本开始,Kafka 消费者属性 reactiveAtmostOnce 可以设置为 true,绑定器将在处理每次轮询返回的记录之前自动提交偏移量。同样,从 4.0.3 版本开始,您可以将消费者属性 reactiveAutoCommit 设置为 true,绑定器将在处理每次轮询返回的记录之后自动提交偏移量。在这些情况下,确认头信息不会存在。

important

4.0.2 版本也提供了 reactiveAutoCommit,但其实现不正确,其行为类似于 reactiveAtMostOnce

以下是如何使用 reaciveAutoCommit 的示例。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
java

请注意,在使用自动提交时,reactor-kafka 会返回一个 Flux<Flux<ConsumerRecord<?, ?>>>。由于 Spring 无法访问内部 Flux 的内容,应用程序必须处理原生的 ConsumerRecord;没有对内容应用消息转换或转换服务。这需要使用原生解码(通过在配置中指定适当类型的 Deserializer)来返回所需类型的记录键/值。