响应式 Kafka Binder 中的可观测性
本节介绍如何在响应式 Kafka 绑定器中启用基于 Micrometer 的可观测性。
生产者绑定
生产者绑定内置了可观测性支持。要启用它,请设置以下属性:
spring.cloud.stream.kafka.binder.enable-observation
当此属性设置为 true 时,你可以观察记录的发布。使用 StreamBridge 发布记录以及常规的 Supplier<?> bean 都可以被观察到。
消费者绑定
在消费端启用可观测性比在生产端更复杂。消费绑定有两个起点:
- 
一个通过生产者绑定发布数据的主题 
- 
一个在 Spring Cloud Stream 之外生成数据的主题 
在第一种情况下,应用程序理想情况下希望将可观测性标头传递到消费者入站。在第二种情况下,如果没有上游观测开始,它将启动一个新的观测。
示例:具有可观测性的函数
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);
		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}
在这个例子中:
- 
当接收到一条记录时,会创建一个观察。 
- 
如果存在上游观察,它将成为 KafkaRecordReceiverContext的一部分。
- 
创建了一个带有延迟上下文的 Mono。
- 
当调用 map操作时,上下文可以访问正确的观察。
- 
flatMap操作的结果作为Flux<Message<?>>发送回绑定。
- 
出站记录将具有来自输入绑定的相同的可观测性头信息。 
示例:带有可观测性的消费者
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}
在这种情况下:
- 
由于没有输出绑定, doOnNext被用于Flux而不是flatMap。
- 
直接调用 observe会启动观察,并在完成后正确关闭它。