仪器化
本文档的此部分内容面向希望在代码库中添加检测功能的用户。
在本节中,我们将看到一些常见的示例,展示如何重用现有的 Micrometer 和 Micrometer Tracing 处理程序和上下文类型来进行检测。
在决定自行对项目进行插桩之前,请务必再次确认该项目是否尚未被插桩!
为了更好地传达如何进行仪器化,我们需要区分两个概念:
- 
上下文传播 
- 
观测的创建 
上下文传播 - 我们通过线程或网络传播现有的上下文。我们使用 Micrometer Context Propagation 库来定义上下文并通过线程传播它。我们使用专用的 SenderContext 和 ReceiverContext 对象,以及 Micrometer Tracing 处理程序,来创建在线路上传播上下文的 Observations。
创建观测 - 我们希望将一个操作包装在观测中,以获取测量结果。我们需要知道之前是否存在一个父级观测,以维护观测之间的父子关系。
HTTP 通信的监测
在本节中,您可以找到如何对进行 HTTP 通信的库进行插装的方法。
HTTP 客户端通信的仪表化
HTTP 客户端检测的解释
┌─────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└──────────────┬──────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
               │                     │                  │                    │                     │                        │                       │
               │        Wrap         │                  │                    │                     │                        │                       │
               │────────────────────>│                  │                    │                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │       Create       │                     │                        │                       │
               │                     │                  │───────────────────>│                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │     Create       │                    │                     │                        │                       │
               │────────────────────────────────────────────────────────────>│                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │       Create        │                        │                       │
               │                     │                  │                    │<────────────────────│                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │                   onStart                    │                       │
               │                     │                  │                    │─────────────────────────────────────────────>│                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │                     │      Wrap in Scope     │                       │
               │                     │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌──────────────┴──────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
- 
在 <3> ObservationRegistry中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingSenderTracingObservationHandler)
- 
创建一个 <1> RequestReplySenderContext,用于包装 <2> 载体(例如HttpRequest)- 
在其构造函数中解释如何丰富头部信息(例如 (key, value) → httpRequest.header(key, value))
- 
在 <1> RequestReplySenderContext上设置 <2> 载体
 
- 
- 
创建一个 <4> Observation,可选择使用 <5>ObservationConvention与发送者上下文- 在 <4> Observation启动时,将通过 <6>ObservationHandler进行传播(例如,载体将被丰富为适当的头部信息)
 
- 在 <4> 
- 
将要检测的 <7> 代码(例如发送 HTTP 请求)包装在作用域中(例如通过 observe或scoped方法)
HTTP 服务器通信的插装
HTTP 服务器端插桩解释
在计算机领域中,插桩(Instrumentation) 是指在代码中插入额外的监控或测量代码,以便收集运行时信息。对于 HTTP 服务器端插桩,通常是指在服务器端代码中插入监控逻辑,以捕获和分析 HTTP 请求和响应的相关数据。
主要目的
- 性能监控:测量请求处理时间、响应时间等性能指标。
- 错误跟踪:捕获并记录请求处理过程中发生的错误或异常。
- 流量分析:统计请求量、用户行为等数据,用于分析和优化。
- 安全审计:监控潜在的恶意请求或异常行为。
常见实现方式
- 中间件(Middleware):在 HTTP 请求处理链中插入中间件,用于捕获请求和响应数据。
- AOP(面向切面编程):通过 AOP 技术在关键方法前后插入监控逻辑。
- 日志记录:在关键代码路径中添加日志记录,用于后续分析。
示例代码
以下是一个简单的 Node.js Express 中间件示例,用于记录 HTTP 请求的耗时:
app.use((req, res, next) => {
  const start = Date.now();
  res.on('finish', () => {
    const duration = Date.now() - start;
    console.log(`${req.method} ${req.url} - ${duration}ms`);
  });
  next();
});
注意事项
- 性能开销:插桩可能会引入额外的性能开销,需权衡监控粒度与性能影响。
- 数据隐私:确保捕获的数据符合隐私保护要求,避免泄露敏感信息。
- 可配置性:提供灵活的配置选项,方便在生产环境中动态启用或禁用插桩功能。
通过 HTTP 服务器端插桩,可以更好地理解和优化服务器行为,为系统稳定性和性能提供有力支持。
┌───────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────┬───────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
                │                      │                  │                    │                     │                        │                       │
                │         Wrap         │                  │                    │                     │                        │                       │
                │─────────────────────>│                  │                    │                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │       Create       │                     │                        │                       │
                │                      │                  │───────────────────>│                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │     Create       │                    │                     │                        │                       │
                │─────────────────────────────────────────────────────────────>│                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │       Create        │                        │                       │
                │                      │                  │                    │<────────────────────│                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │                   onStart                    │                       │
                │                      │                  │                    │─────────────────────────────────────────────>│                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │                     │      Wrap in Scope     │                       │
                │                      │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌───────────────┴───────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
- 
在 ObservationRegistry中注册一个处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingReceiverTracingObservationHandler)
- 
创建一个 <1> RequestReplyReceiverContext,它包装了一个 <2> 载体(例如HttpRequest)- 
在其构造函数中解释如何检索头值(例如 (carrier, key) → carrier.header(key))
- 
将 <2> 载体设置到 <1> RequestReplyReceiverContext上
 
- 
- 
创建一个 <4> Observation,可以选择使用 <5>ObservationConvention与接收者上下文- 在 <4> Observation开始时,将通过 <6>ObservationHandler进行传播(例如,载体将被适当的头信息丰富)
 
- 在 <4> 
- 
将 <6> 要检测的代码(例如处理 HTTP 请求)包装在作用域中(例如通过 observe或scoped方法)
HTTP 通信示例的插装
为了对基于 HTTP 的通信进行监控,我们需要分别在客户端和服务器端使用 RequestReplySenderContext 和 RequestReplyReceiverContext。
作为客户端的示例,我们使用一个处理器来通过添加一个 foo:bar 头来检测 HTTP 请求(如果你的类路径上有 Micrometer Tracing,你可以重用 PropagatingSenderTracingObservationHandler 和 PropagatingReceiverTracingObservationHandler 来在网络上传播跟踪上下文)。让我们考虑一个这样的处理器示例:
static class HeaderPropagatingHandler implements ObservationHandler<SenderContext<Object>> {
    @Override
    public void onStart(SenderContext<Object> context) {
        context.getSetter().set(context.getCarrier(), "foo", "bar");
    }
    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof SenderContext;
    }
}
考虑以下重复使用处理程序的 HTTP 客户端检测:
// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.
// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());
// We're using WireMock to stub the HTTP GET call to "/foo" with a response "OK"
stubFor(get("/foo").willReturn(ok().withBody("OK")));
// RequestReplySenderContext is a special type of context used for request-reply
// communication. It requires to define what the Request type is and how we can
// instrument it. It also needs to know what the Response type is
RequestReplySenderContext<HttpUriRequestBase, ClassicHttpResponse> context = new RequestReplySenderContext<>(
        (carrier, key, value) -> Objects.requireNonNull(carrier).addHeader(key, value));
// We're instrumenting the Apache HTTPClient
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
    // The HttpGet is our carrier (we can mutate it to instrument the headers)
    HttpGet httpget = new HttpGet(info.getHttpBaseUrl() + "/foo");
    // We must set the carrier BEFORE we run <Observation#start>
    context.setCarrier(httpget);
    // You can set the remote service address to provide more debugging
    // information
    context.setRemoteServiceAddress(info.getHttpBaseUrl());
    // Examples of setting key values from the request
    Observation observation = Observation.createNotStarted("http.client.requests", () -> context, registry)
        .contextualName("HTTP " + httpget.getMethod())
        .lowCardinalityKeyValue("http.url", info.getHttpBaseUrl() + "/{name}")
        .highCardinalityKeyValue("http.full-url", httpget.getRequestUri());
    observation.observeChecked(() -> {
        String response = httpclient.execute(httpget, classicHttpResponse -> {
            // We should set the response before we stop the observation
            context.setResponse(classicHttpResponse);
            // Example of setting key values from the response
            observation.highCardinalityKeyValue("http.content.length",
                    String.valueOf(classicHttpResponse.getEntity().getContentLength()));
            return EntityUtils.toString(classicHttpResponse.getEntity());
        });
        then(response).isEqualTo("OK");
    });
}
// We want to be sure that we have successfully enriched the HTTP headers
verify(getRequestedFor(urlEqualTo("/foo")).withHeader("foo", equalTo("bar")));
作为一个服务器端的示例,我们使用一个处理器,通过添加一个低基数的键 foo 并使其值为 HTTP 请求中匹配的路径来对 Observation 进行仪表化。以下是一个此类处理器的示例:
static class HeaderReadingHandler implements ObservationHandler<ReceiverContext<Context>> {
    @Override
    public void onStart(ReceiverContext<Context> context) {
        String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
        // We're setting the value of the <foo> header as a low cardinality key value
        context.addLowCardinalityKeyValue(KeyValue.of("foo", fooHeader));
    }
    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof ReceiverContext;
    }
}
考虑以下重用处理程序的 HTTP 服务器端检测:
// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.
// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that will reuse the <foo> header from the request as a
// low cardinality key value
registry.observationConfig().observationHandler(new HeaderReadingHandler());
try (Javalin javalin = Javalin.create().before("/hello/{name}", ctx -> {
    // We're creating the special RequestReplyReceiverContext that will reuse the
    // information from the HTTP headers
    RequestReplyReceiverContext<Context, Context> receiverContext = new RequestReplyReceiverContext<>(
            Context::header);
    // Remember to set the carrier!!!
    receiverContext.setCarrier(ctx);
    String remoteServiceAddress = ctx.scheme() + "://" + ctx.host();
    receiverContext.setRemoteServiceAddress(remoteServiceAddress);
    // We're starting an Observation with the context
    Observation observation = Observation
        .createNotStarted("http.server.requests", () -> receiverContext, registry)
        .contextualName("HTTP " + ctx.method() + " " + ctx.matchedPath())
        .lowCardinalityKeyValue("http.url", remoteServiceAddress + ctx.matchedPath())
        .highCardinalityKeyValue("http.full-url", remoteServiceAddress + ctx.path())
        .lowCardinalityKeyValue("http.method", ctx.method().name())
        .start();
    // Let's be consistent and always set the Observation related objects under
    // the same key
    ctx.attribute(ObservationThreadLocalAccessor.KEY, observation);
}).get("/hello/{name}", ctx -> {
    // We need to be thread-safe - we're not using ThreadLocals, we're retrieving
    // information from the attributes
    Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
    observation.scoped(() -> {
        // If we need thread locals (e.g. MDC entries) we can use <scoped()>
        log.info("We're using scoped - Observation in thread local here [" + registry.getCurrentObservation()
                + "]");
        then(registry.getCurrentObservation()).isNotNull();
    });
    // We're returning body
    ctx.result("Hello World [" + observation.getContext().getLowCardinalityKeyValue("foo").getValue() + "]");
}).after("/hello/{name}", ctx -> {
    // After sending the response we want to stop the Observation
    Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
    observation.stop();
}).start(0)) {
    // We're sending an HTTP request with a <foo:bar> header. We're expecting that
    // it will be reused in the response
    String response = sendRequestToHelloEndpointWithHeader(javalin.port(), "foo", "bar");
    // The response must contain the value from the header
    then(response).isEqualTo("Hello World [bar]");
}
消息通信的仪表化
在本节中,您可以了解如何检测那些采用“发射后不管”(fire-and-forget)通信方式的库。
消息生产者端的仪表化
消息生产者端监控说明
┌─────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└────────┬────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
         │               │                  │                    │                     │                        │                       │
         │     Wrap      │                  │                    │                     │                        │                       │
         │──────────────>│                  │                    │                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │       Create       │                     │                        │                       │
         │               │                  │───────────────────>│                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │        Create    │                    │                     │                        │                       │
         │──────────────────────────────────────────────────────>│                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │       Create        │                        │                       │
         │               │                  │                    │<────────────────────│                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │                   onStart                    │                       │
         │               │                  │                    │─────────────────────────────────────────────>│                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │                     │      Wrap in Scope     │                       │
         │               │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌────────┴────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
- 
在 <3> ObservationRegistry中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingSenderTracingObservationHandler)
- 
创建一个 <1> SenderContext,用于包装一个 <2> 载体(例如AmqpMessage)- 
在其构造函数中解释如何丰富头信息(例如 (key, value) → amqpMessage.header(key, value))
- 
在 <1> SenderContext上设置 <2> 载体
 
- 
- 
创建一个 <4> Observation,可选择使用 <5>ObservationConvention与发送者上下文- 在 <4> Observation开始时,将通过 <6>ObservationHandler进行传播(例如,载体将被适当的头信息丰富)
 
- 在 <4> 
- 
将 <7> 要检测的代码(例如发送 AMQP 消息)包装在作用域中(例如通过 observe或scoped方法)
消息消费者端通信的仪表化
消息消费端监控说明
┌───────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────┬─────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
          │                │                  │                    │                     │                        │                       │
          │      Wrap      │                  │                    │                     │                        │                       │
          │───────────────>│                  │                    │                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │       Create       │                     │                        │                       │
          │                │                  │───────────────────>│                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │        Create    │                    │                     │                        │                       │
          │───────────────────────────────────────────────────────>│                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │       Create        │                        │                       │
          │                │                  │                    │<────────────────────│                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │                   onStart                    │                       │
          │                │                  │                    │─────────────────────────────────────────────>│                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │                     │      Wrap in Scope     │                       │
          │                │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌─────────┴─────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
- 
在 <3> ObservationRegistry中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingReceiverTracingObservationHandler)
- 
创建一个 <1> ReceiverContext,它包装了一个 <2> 载体(例如AmqpMessage)- 
在其构造函数中解释如何检索标头值(例如 (carrier, key) → carrier.header(key))
- 
在 <1> ReceiverContext上设置 <2> 载体
 
- 
- 
创建一个 <4> Observation,可选择使用 <6>ObservationConvention与接收者上下文- 在 <4> Observation开始时,传播将通过 <6>ObservationHandler进行(例如,载体将被适当的标头丰富)
 
- 在 <4> 
- 
将 <7> 要检测的代码(例如 AMQP 消息的处理)包装在作用域中(例如通过 observe或scoped方法)- 对于某些库(例如 RabbitMQ),您可能无法控制用户的代码,并且可能需要用户允许框架启动消费者端 Observation 并打开其作用域(将值放入线程本地),并要求用户在代码中手动关闭作用域并停止 Observation!
 
消息通信示例的插装
为了对基于消息的通信进行监控,我们需要在生产者端和消费者端分别使用 SenderContext 和 ReceiverContext。
在本节中,我们将为 Apache Kafka 创建一个简单的监控工具。
作为生产者端的一个示例,我们将使用一个处理程序,通过添加 foo:bar 标头来对消息进行标记(如果你的类路径上有 Micrometer Tracing,你可以重用 PropagatingSenderTracingObservationHandler 和 PropagatingReceiverTracingObservationHandler 来在网络上传播追踪上下文)。考虑以下 KafkaSenderContext 的示例:
static class KafkaSenderContext extends SenderContext<ProducerRecord<String, String>> {
    public KafkaSenderContext(ProducerRecord<String, String> producerRecord) {
        // We describe how the carrier will be mutated (we mutate headers)
        super((carrier, key, value) -> carrier.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
        setCarrier(producerRecord);
    }
}
考虑以下上述处理程序的示例:
static class HeaderPropagatingHandler implements ObservationHandler<KafkaSenderContext> {
    @Override
    public void onStart(KafkaSenderContext context) {
        context.getSetter().set(context.getCarrier(), "foo", "bar");
        context.addLowCardinalityKeyValue(KeyValue.of("sent", "true"));
    }
    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof KafkaSenderContext;
    }
}
考虑以下代码,它是 Kafka 的 ProducerInterceptor:
public class ProducerInterceptorConfig implements ProducerInterceptor<String, String> {
    private ObservationRegistry observationRegistry;
    private Observation observation;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // This code will be called before the message gets sent. We create
        // a context and pass it to an Observation. Upon start, the handler will be called
        // and the ProducerRecord will be mutated
        KafkaSenderContext context = new KafkaSenderContext(record);
        this.observation = Observation.start("kafka.send", () -> context, observationRegistry);
        // We return the mutated carrier
        return context.getCarrier();
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // Once the message got sent (with or without an exception) we attach an exception
        // and stop the observation
        this.observation.error(exception);
        this.observation.stop();
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
        // We retrieve the ObservationRegistry from the configuration
        this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
    }
}
考虑以下生产者端检测代码,该代码重用了处理程序:
TestObservationRegistry registry = TestObservationRegistry.create();
// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request and will
// set the low cardinality key "sent" to "true".
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());
// Producer side...
Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
producerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
producerConfigs.put(ObservationRegistry.class.getName(), registry);
producerConfigs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        "io.micrometer.docs.observation.messaging.ProducerInterceptorConfig");
Producer<String, String> producer = new KafkaProducer<>(producerConfigs, new StringSerializer(),
        new StringSerializer());
// Producer sends a message
producer.send(new ProducerRecord<>(topic, "foo"));
producer.flush();
作为消费者端的一个示例,我们使用一个处理程序,通过添加 foo 低基数键来为 Observation 添加工具,该键的值是消息中匹配的路径。考虑以下 KafkaReceiverContext 的示例:
static class KafkaReceiverContext extends ReceiverContext<ConsumerRecords<String, String>> {
    public KafkaReceiverContext(ConsumerRecords<String, String> consumerRecord) {
        // We describe how to read entries from the carrier (we read headers)
        super((carrier, key) -> {
            // This is a very naive approach that takes the first ConsumerRecord
            Header header = carrier.iterator().next().headers().lastHeader(key);
            if (header != null) {
                return new String(header.value());
            }
            return null;
        });
        setCarrier(consumerRecord);
    }
}
考虑以下上述处理程序的示例。
static class HeaderReadingHandler implements ObservationHandler<KafkaReceiverContext> {
    @Override
    public void onStart(KafkaReceiverContext context) {
        String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
        // We're setting the value of the <foo> header as a low cardinality key value
        context.addLowCardinalityKeyValue(KeyValue.of("received foo header", fooHeader));
    }
    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof KafkaReceiverContext;
    }
}
考虑以下代码,它是 Kafka 的 ConsumerInterceptor:
public class ConsumerInterceptorConfig implements ConsumerInterceptor<String, String> {
    private ObservationRegistry observationRegistry;
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // We're creating the receiver context
        KafkaReceiverContext context = new KafkaReceiverContext(records);
        // Then, we're just starting and stopping the observation on the consumer side
        Observation.start("kafka.receive", () -> context, observationRegistry).stop();
        // We could put the Observation in scope so that the users can propagate it
        // further on
        return context.getCarrier();
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
        // We retrieve the ObservationRegistry from the configuration
        this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
    }
}
考虑以下重复使用处理程序的消费者端插桩代码:
TestObservationRegistry registry = TestObservationRegistry.create();
// Consumer side...
// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that takes the "foo" header's value and sets it as a low
// cardinality key "received foo header"
registry.observationConfig().observationHandler(new HeaderReadingHandler());
Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
consumerConfigs.put(ObservationRegistry.class.getName(), registry);
consumerConfigs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        "io.micrometer.docs.observation.messaging.ConsumerInterceptorConfig");
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(),
        new StringDeserializer());
// Consumer scubscribes to the topic
consumer.subscribe(Collections.singletonList(topic));
// Consumer polls for a message
consumer.poll(Duration.ofMillis(1000));
让我们看一下在发送和接收消息之后的断言。我们应该有 2 个观察结果,1 个在消费者端,1 个在生产者端,每个观察结果都应该有适当的键值,如处理程序中所示。
assertThat(registry).hasObservationWithNameEqualTo("kafka.send")
    .that()
    .hasBeenStarted()
    .hasBeenStopped()
    .hasLowCardinalityKeyValue("sent", "true");
assertThat(registry).hasObservationWithNameEqualTo("kafka.receive")
    .that()
    .hasBeenStarted()
    .hasBeenStopped()
    .hasLowCardinalityKeyValue("received foo header", "bar");
线程切换组件的插装
我们可能想要围绕通过 Executor 提交的 Runnable 或 Callable 创建一个 Observation。为了实现这一点,我们需要知道在父线程中是否存在一个 Observation,新线程应该继续这个 Observation,或者是否需要为其创建一个子 Observation。
考虑以下示例:
// Example of an Executor Service
ExecutorService executor = Executors.newCachedThreadPool();
// This snippet shows an example of how to wrap in an observation code that would
// be executed in a separate thread
// Let's assume that we have a parent observation
Observation parent = Observation.createNotStarted("parent", registry);
// Observation is put in scope via the <observe()> method
Future<Boolean> child = parent.observe(() -> {
    // [Thread 1] Current Observation is the same as <parent>
    then(registry.getCurrentObservation()).isSameAs(parent);
    // [Thread 1] We're wrapping the executor in a Context Propagating version.
    // <ContextExecutorService> comes from Context Propagation library
    return ContextExecutorService.wrap(executor).submit(() -> {
        // [Thread 2] Current Observation is same as <parent> - context got
        // propagated
        then(registry.getCurrentObservation()).isSameAs(parent);
        // Wraps the code that should be run in a separate thread in an
        // observation
        return Observation.createNotStarted("child", registry).observe(this::yourCodeToMeasure);
    });
});
响应式库的插桩
在本节中,我们将讨论如何在 Observations 中包装 Reactive 库,以及如何使用 Reactor Context 安全地在线程之间传播 Observations。
对于 Reactor 3.5.3 及之后的版本
在 Reactor 3.5.3 版本中(通过这个 PR),添加了一个启用自动上下文传播的选项。要使用此功能,请确保至少使用以下项目的最低版本:
要使用该功能,请调用新的 Reactor 的 Hook 方法(例如,在你的 public static void main 方法中),如下所示:
Hooks.enableAutomaticContextPropagation();
这会自动包装 Reactor 的内部机制,以便在操作符、线程等之间传播上下文。不需要使用 tap 和 handle 或上下文传播 API。
考虑以下示例:
// This snippet shows an example of how to use the new Hook API with Reactor
Hooks.enableAutomaticContextPropagation();
// Starting from Micrometer 1.10.8 you need to set your registry on this singleton
// instance of OTLA
ObservationThreadLocalAccessor.getInstance().setObservationRegistry(registry);
// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);
// Now we put it in thread local
parent.scoped(() -> {
    // Example of propagating whatever there was in thread local
    Integer block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
        log.info("Context Propagation happens - the <parent> observation gets propagated ["
                + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isSameAs(parent);
    })
        .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
        .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
            log.info("Context Propagation happens - the <parent> observation gets propagated ["
                    + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isSameAs(parent);
        }))
        // Let's assume that we're modifying the context
        .contextWrite(context -> context.put("foo", "bar"))
        // Since we are NOT part of the Reactive Chain (e.g. this is not a
        // WebFlux application)
        // you MUST call <contextCapture> to capture all ThreadLocal values
        // and store them in a Reactor Context.
        // ----------------------
        // If you were part of the
        // Reactive Chain (e.g. returning Mono from endpoint)
        // there is NO NEED to call <contextCapture>. If you need to propagate
        // your e.g. Observation
        // to the Publisher you just created (e.g. Mono or Flux) please
        // consider adding it
        // to the Reactor Context directly instead of opening an Observation
        // scope and calling <contextCapture> (see example below).
        .contextCapture()
        .block();
    // We're still using <parent> as current observation
    then(registry.getCurrentObservation()).isSameAs(parent);
    then(block).isEqualTo(2);
    // Now, we want to create a child observation for a Reactor stream and put it
    // to Reactor Context
    // Automatically its parent will be <parent> observation since <parent> is in
    // Thread Local
    Observation child = Observation.start("child", registry);
    block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
        log.info(
                "Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
                        + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isSameAs(child);
    })
        .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
        .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
            log.info(
                    "Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
                            + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isSameAs(child);
        }))
        // Remember to stop the child Observation!
        .doFinally(signalType -> child.stop())
        // When using Reactor we ALWAYS search for
        // ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
        // search for an Observation. You DON'T have to use <contextCapture>
        // because
        // you have manually provided the ThreadLocalAccessor key
        .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
        .block();
    // We're back to having <parent> as current observation
    then(registry.getCurrentObservation()).isSameAs(parent);
    then(block).isEqualTo(2);
});
// There should be no remaining observation
then(registry.getCurrentObservation()).isNull();
// We need to stop the parent
parent.stop();
如果该方法的性能不理想,请检查禁用钩子并显式使用 handle 或 tap 操作符是否能提高性能。
Reactor 3.5.3 之前
通过 Reactor 在 Flux 中传播元素的推荐方式不是通过 ThreadLocal 实例,而是通过 Reactor Context。然而,Reactor 提供了两个操作符:tap() 和 handle()。如果 Micrometer Context Propagation 库在类路径中,这两个操作符会为你设置线程本地值。
考虑以下示例:
// This snippet shows an example of how to wrap code that is using Reactor
// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);
// We want to create a child observation for a Reactor stream
Observation child = Observation.start("child", registry)
    // There's no thread local entry, so we will pass parent observation
    // manually. If we put the Observation in scope we could then call
    // <.contextCapture()> method from Reactor to capture all thread locals
    // and store them in Reactor Context.
    .parentObservation(parent);
Integer block = Mono.just(1)
    // Example of not propagating context by default
    .doOnNext(integer -> {
        log.info(
                "No context propagation happens by default in Reactor - there will be no Observation in thread local here ["
                        + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isNull();
    })
    // Example of having entries in thread local for <tap()> operator
    .tap(() -> new DefaultSignalListener<Integer>() {
        @Override
        public void doFirst() throws Throwable {
            log.info("We're using tap() -> there will be Observation in thread local here ["
                    + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isNotNull();
        }
    })
    .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
    // Example of retrieving ThreadLocal entries via ReactorContext
    .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
        try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(contextView)) {
            log.info(
                    "We're retrieving thread locals from Reactor Context - there will be Observation in thread local here ["
                            + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isNotNull();
        }
    }))
    // Example of having entries in thread local for <handle()> operator
    .handle((BiConsumer<Integer, SynchronousSink<Integer>>) (integer, synchronousSink) -> {
        log.info("We're using handle() -> There will be Observation in thread local here ["
                + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isNotNull();
        synchronousSink.next(integer);
    })
    // Remember to stop the child Observation!
    .doFinally(signalType -> child.stop())
    // When using Reactor we ALWAYS search for
    // ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
    // search for an Observation
    .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
    // If there were ThreadLocal entries that are using Micrometer Context
    // Propagation they would be caught here. All implementations of
    // <ThreadLocalAccessor> will store their thread local entries under their
    // keys in Reactor Context
    .contextCapture()
    .block();
// We didn't have any observations in thread local
then(registry.getCurrentObservation()).isNull();
// We need to stop the parent
parent.stop();
then(block).isEqualTo(2);