跳到主要内容

响应式 Kafka Binder 中的可观测性

DeepSeek V3 中英对照 Observability in Reactive Kafka Binder

本节介绍如何在响应式 Kafka 绑定器中启用基于 Micrometer 的可观测性。

生产者绑定

生产者绑定内置了可观测性支持。要启用它,请设置以下属性:

spring.cloud.stream.kafka.binder.enable-observation
none

当此属性设置为 true 时,你可以观察记录的发布。使用 StreamBridge 发布记录以及常规的 Supplier<?> bean 都可以被观察到。

消费者绑定

在消费端启用可观测性比在生产端更复杂。消费绑定有两个起点:

  1. 一个通过生产者绑定发布数据的主题

  2. 一个在 Spring Cloud Stream 之外生成数据的主题

在第一种情况下,应用程序理想情况下希望将可观测性标头传递到消费者入站。在第二种情况下,如果没有上游观测开始,它将启动一个新的观测。

示例:具有可观测性的函数

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);

return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
none

在这个例子中:

  1. 当接收到一条记录时,会创建一个观察。

  2. 如果存在上游观察,它将成为 KafkaRecordReceiverContext 的一部分。

  3. 创建了一个带有延迟上下文的 Mono

  4. 当调用 map 操作时,上下文可以访问正确的观察。

  5. flatMap 操作的结果作为 Flux<Message<?>> 发送回绑定。

  6. 出站记录将具有来自输入绑定的相同的可观测性头信息。

示例:带有可观测性的消费者

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
none

在这种情况下:

  1. 由于没有输出绑定,doOnNext 被用于 Flux 而不是 flatMap

  2. 直接调用 observe 会启动观察,并在完成后正确关闭它。