使用 Reactive Kafka Binder 的基础示例
在本节中,我们将展示一些基本的代码片段,用于使用响应式绑定器编写响应式 Kafka 应用程序,并详细介绍相关内容。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
你可以将上述的 uppercase
函数与基于消息通道的 Kafka binder(spring-cloud-stream-binder-kafka
)以及本节讨论的响应式 Kafka binder(spring-cloud-stream-binder-kafka-reactive
)一起使用。当你在常规的 Kafka binder 中使用这个函数时,尽管你在应用程序中使用了响应式类型(即在 uppercase
函数中),你只能在函数执行期间获得响应式流。在函数的执行上下文之外,由于底层的 binder 不是基于响应式堆栈的,因此没有响应式的好处。因此,尽管这看起来像是带来了一个完整的端到端响应式堆栈,但这个应用程序只是部分响应式的。
现在假设你在上述函数的应用中使用的是 Kafka 的响应式绑定器——spring-cloud-stream-binder-kafka-reactive
。这个绑定器实现将从顶端的消费到底端的发布,全程提供完整的响应式优势。这是因为底层的绑定器是构建在 Reactor Kafka 的核心 API 之上的。在消费者端,它使用了 KafkaReceiver,这是 Kafka 消费者的响应式实现。同样,在生产者端,它使用了 KafkaSender API,这是 Kafka 生产者的响应式实现。由于响应式 Kafka 绑定器的基础是建立在正确的响应式 Kafka API 之上的,应用程序可以充分利用响应式技术的优势。使用这个响应式 Kafka 绑定器时,诸如自动背压等响应式能力都是内置的。
从 4.0.2 版本开始,你可以通过提供一个或多个 ReceiverOptionsCustomizer
或 SenderOptionsCustomizer
的 bean 来自定义 ReceiverOptions
和 SenderOptions
。它们是 BiFunction
,接收绑定的名称和初始选项,并返回自定义后的选项。这些接口扩展了 Ordered
,因此当存在多个自定义器时,它们将按照所需的顺序应用。
默认情况下,binder 不会提交偏移量。从 4.0.2 版本开始,KafkaHeaders.ACKNOWLEDGMENT
头信息包含一个 ReceiverOffset
对象,允许你通过调用其 acknowledge()
或 commit()
方法来提交偏移量。
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参考 reactor-kafka
文档和 javadocs。
此外,从 4.0.3 版本开始,Kafka 消费者属性 reactiveAtmostOnce
可以设置为 true
,绑定器将在处理每次轮询返回的记录之前自动提交偏移量。同样,从 4.0.3 版本开始,您可以将消费者属性 reactiveAutoCommit
设置为 true
,绑定器将在处理每次轮询返回的记录之后自动提交偏移量。在这些情况下,确认头信息不会存在。
4.0.2 版本也提供了 reactiveAutoCommit
,但其实现不正确,其行为类似于 reactiveAtMostOnce
。
以下是如何使用 reaciveAutoCommit
的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,在使用自动提交时,reactor-kafka
会返回一个 Flux<Flux<ConsumerRecord<?, ?>>>
。由于 Spring 无法访问内部 Flux 的内容,应用程序必须处理原生的 ConsumerRecord
;没有对内容应用消息转换或转换服务。这需要使用原生解码(通过在配置中指定适当类型的 Deserializer
)来返回所需类型的记录键/值。