跳到主要内容

可观测性

DeepSeek V3 中英对照 Observability

Spring 通过 Micrometer 提供了对可观测性的支持,Micrometer 定义了一个 Observation 概念,使得应用程序中能够同时支持指标(Metrics)和追踪(Traces)

Spring Cloud Stream 在 Spring Cloud Function 的层面上集成了这种支持,通过提供多个抽象之一——ObservationFunctionAroundWrapper,它包装了函数以开箱即用地处理观测。

所需依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
xml

以及可用的追踪桥接器之一。例如 Zipkin Brave

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
xml

命令式函数

命令式函数被封装在观察包装器 ObservationFunctionAroundWrapper 中,该包装器提供了必要的基础设施来处理与观察注册表的交互。这种交互发生在每次函数调用时,这实际上意味着观察被附加到函数的每次调用上(即每条消息对应一次观察)。换句话说,对于命令式函数,如果前面提到的所需依赖项存在,可观察性将自动生效。

响应式函数

响应式函数本质上与命令式函数不同,因此它们不会被 ObservationFunctionAroundWrapper 包裹。

命令式函数 是一种消息处理函数,每次有消息时由框架调用,类似于典型的事件处理程序,其中对于 N 条消息,将会有 N 次调用此类函数。这使我们能够包装此类函数,以使用额外的功能来装饰它,例如错误处理重试,当然还有可观测性

Reactive function 是初始化函数。它的职责是将用户提供的流处理代码(Flux)与绑定器提供的源流和目标流连接起来。它仅在应用程序启动时调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理过程。这一切都由响应式 API 掌控。Reactive function 还引入了一个额外的变量。考虑到该函数使您能够看到整个流链(而不仅仅是单个事件),那么默认的观察单位应该是什么?流链中的单个项目?一系列项目?如果经过一段时间后没有消息怎么办?等等……我们想要强调的是,使用响应式函数时,我们不能做出任何假设。(有关响应式函数与命令式函数之间差异的更多信息,请参阅 Reactive Functions)。

所以,就像处理重试错误处理一样,你需要手动处理观察。

幸运的是,你可以通过使用响应式 API 的 tap 操作来轻松实现这一点,同时提供一个 ObservationRegistry 实例。这样的片段定义了一个观察单元,它可以是 Flux 中的单个项目、一个范围,或者你希望在流中观察的任何其他内容。

@SpringBootApplication
public class DemoStreamApplication {

Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoStreamApplication.class, args);
}

@Bean
public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
return flux -> flux.flatMap(item -> {
return Mono.just(item)
.map(value -> value.toUpperCase())
.doOnNext(v -> logger.info(v))
.tap(Micrometer.observation(registry));
});
}
}
java

上面的例子模拟了将 Observation 附加到单个消息处理(即命令式函数)上,因为在这种情况下,观察的单元从 Mono.just(..) 开始,最后一个操作将 ObservationRegistry 附加到订阅者上。

如果订阅者已经附加了一个观察(observation),它将用于在 tap 的上游链/段中创建一个子观察(child Observation)。然而,正如我们之前提到的,默认情况下,框架不会将任何观察附加到你返回的流链上。