仪器化
本文档的此部分内容面向希望在代码库中添加检测功能的用户。
在本节中,我们将看到一些常见的示例,展示如何重用现有的 Micrometer 和 Micrometer Tracing 处理程序和上下文类型来进行检测。
在决定自行对项目进行插桩之前,请务必再次确认该项目是否尚未被插桩!
为了更好地传达如何进行仪器化,我们需要区分两个概念:
-
上下文传播
-
观测的创建
上下文传播 - 我们通过线程或网络传播现有的上下文。我们使用 Micrometer Context Propagation 库来定义上下文并通过线程传播它。我们使用专用的 SenderContext
和 ReceiverContext
对象,以及 Micrometer Tracing 处理程序,来创建在线路上传播上下文的 Observations。
创建观测 - 我们希望将一个操作包装在观测中,以获取测量结果。我们需要知道之前是否存在一个父级观测,以维护观测之间的父子关系。
HTTP 通信的监测
在本节中,您可以找到如何对进行 HTTP 通信的库进行插装的方法。
HTTP 客户端通信的仪表化
HTTP 客户端检测的解释
┌─────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└──────────────┬──────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
│ │ │ │ │ │ │
│ Wrap │ │ │ │ │ │
│────────────────────>│ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ Create │ │ │ │
│ │ │───────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ Create │ │ │ │ │
│────────────────────────────────────────────────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ │ │ Create │ │ │
│ │ │ │<────────────────────│ │ │
│ │ │ │ │ │ │
│ │ │ │ onStart │ │
│ │ │ │─────────────────────────────────────────────>│ │
│ │ │ │ │ │ │
│ │ │ │ │ Wrap in Scope │ │
│ │ │ │─────────────────────────────────────────────────────────────────────>│
┌──────────────┴──────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
-
在 <3>
ObservationRegistry
中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingSenderTracingObservationHandler
) -
创建一个 <1>
RequestReplySenderContext
,用于包装 <2> 载体(例如HttpRequest
)-
在其构造函数中解释如何丰富头部信息(例如
(key, value) → httpRequest.header(key, value)
) -
在 <1>
RequestReplySenderContext
上设置 <2> 载体
-
-
创建一个 <4>
Observation
,可选择使用 <5>ObservationConvention
与发送者上下文- 在 <4>
Observation
启动时,将通过 <6>ObservationHandler
进行传播(例如,载体将被丰富为适当的头部信息)
- 在 <4>
-
将要检测的 <7> 代码(例如发送 HTTP 请求)包装在作用域中(例如通过
observe
或scoped
方法)
HTTP 服务器通信的插装
HTTP 服务器端插桩解释
在计算机领域中,插桩(Instrumentation) 是指在代码中插入额外的监控或测量代码,以便收集运行时信息。对于 HTTP 服务器端插桩,通常是指在服务器端代码中插入监控逻辑,以捕获和分析 HTTP 请求和响应的相关数据。
主要目的
- 性能监控:测量请求处理时间、响应时间等性能指标。
- 错误跟踪:捕获并记录请求处理过程中发生的错误或异常。
- 流量分析:统计请求量、用户行为等数据,用于分析和优化。
- 安全审计:监控潜在的恶意请求或异常行为。
常见实现方式
- 中间件(Middleware):在 HTTP 请求处理链中插入中间件,用于捕获请求和响应数据。
- AOP(面向切面编程):通过 AOP 技术在关键方法前后插入监控逻辑。
- 日志记录:在关键代码路径中添加日志记录,用于后续分析。
示例代码
以下是一个简单的 Node.js Express 中间件示例,用于记录 HTTP 请求的耗时:
app.use((req, res, next) => {
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
console.log(`${req.method} ${req.url} - ${duration}ms`);
});
next();
});
注意事项
- 性能开销:插桩可能会引入额外的性能开销,需权衡监控粒度与性能影响。
- 数据隐私:确保捕获的数据符合隐私保护要求,避免泄露敏感信息。
- 可配置性:提供灵活的配置选项,方便在生产环境中动态启用或禁用插桩功能。
通过 HTTP 服务器端插桩,可以更好地理解和优化服务器行为,为系统稳定性和性能提供有力支持。
┌───────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────┬───────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
│ │ │ │ │ │ │
│ Wrap │ │ │ │ │ │
│─────────────────────>│ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ Create │ │ │ │
│ │ │───────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ Create │ │ │ │ │
│─────────────────────────────────────────────────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ │ │ Create │ │ │
│ │ │ │<────────────────────│ │ │
│ │ │ │ │ │ │
│ │ │ │ onStart │ │
│ │ │ │─────────────────────────────────────────────>│ │
│ │ │ │ │ │ │
│ │ │ │ │ Wrap in Scope │ │
│ │ │ │─────────────────────────────────────────────────────────────────────>│
┌───────────────┴───────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
-
在
ObservationRegistry
中注册一个处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingReceiverTracingObservationHandler
) -
创建一个 <1>
RequestReplyReceiverContext
,它包装了一个 <2> 载体(例如HttpRequest
)-
在其构造函数中解释如何检索头值(例如
(carrier, key) → carrier.header(key)
) -
将 <2> 载体设置到 <1>
RequestReplyReceiverContext
上
-
-
创建一个 <4>
Observation
,可以选择使用 <5>ObservationConvention
与接收者上下文- 在 <4>
Observation
开始时,将通过 <6>ObservationHandler
进行传播(例如,载体将被适当的头信息丰富)
- 在 <4>
-
将 <6> 要检测的代码(例如处理 HTTP 请求)包装在作用域中(例如通过
observe
或scoped
方法)
HTTP 通信示例的插装
为了对基于 HTTP 的通信进行监控,我们需要分别在客户端和服务器端使用 RequestReplySenderContext
和 RequestReplyReceiverContext
。
作为客户端的示例,我们使用一个处理器来通过添加一个 foo:bar
头来检测 HTTP 请求(如果你的类路径上有 Micrometer Tracing,你可以重用 PropagatingSenderTracingObservationHandler
和 PropagatingReceiverTracingObservationHandler
来在网络上传播跟踪上下文)。让我们考虑一个这样的处理器示例:
static class HeaderPropagatingHandler implements ObservationHandler<SenderContext<Object>> {
@Override
public void onStart(SenderContext<Object> context) {
context.getSetter().set(context.getCarrier(), "foo", "bar");
}
@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof SenderContext;
}
}
考虑以下重复使用处理程序的 HTTP 客户端检测:
// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.
// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());
// We're using WireMock to stub the HTTP GET call to "/foo" with a response "OK"
stubFor(get("/foo").willReturn(ok().withBody("OK")));
// RequestReplySenderContext is a special type of context used for request-reply
// communication. It requires to define what the Request type is and how we can
// instrument it. It also needs to know what the Response type is
RequestReplySenderContext<HttpUriRequestBase, ClassicHttpResponse> context = new RequestReplySenderContext<>(
(carrier, key, value) -> Objects.requireNonNull(carrier).addHeader(key, value));
// We're instrumenting the Apache HTTPClient
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
// The HttpGet is our carrier (we can mutate it to instrument the headers)
HttpGet httpget = new HttpGet(info.getHttpBaseUrl() + "/foo");
// We must set the carrier BEFORE we run <Observation#start>
context.setCarrier(httpget);
// You can set the remote service address to provide more debugging
// information
context.setRemoteServiceAddress(info.getHttpBaseUrl());
// Examples of setting key values from the request
Observation observation = Observation.createNotStarted("http.client.requests", () -> context, registry)
.contextualName("HTTP " + httpget.getMethod())
.lowCardinalityKeyValue("http.url", info.getHttpBaseUrl() + "/{name}")
.highCardinalityKeyValue("http.full-url", httpget.getRequestUri());
observation.observeChecked(() -> {
String response = httpclient.execute(httpget, classicHttpResponse -> {
// We should set the response before we stop the observation
context.setResponse(classicHttpResponse);
// Example of setting key values from the response
observation.highCardinalityKeyValue("http.content.length",
String.valueOf(classicHttpResponse.getEntity().getContentLength()));
return EntityUtils.toString(classicHttpResponse.getEntity());
});
then(response).isEqualTo("OK");
});
}
// We want to be sure that we have successfully enriched the HTTP headers
verify(getRequestedFor(urlEqualTo("/foo")).withHeader("foo", equalTo("bar")));
作为一个服务器端的示例,我们使用一个处理器,通过添加一个低基数的键 foo
并使其值为 HTTP 请求中匹配的路径来对 Observation 进行仪表化。以下是一个此类处理器的示例:
static class HeaderReadingHandler implements ObservationHandler<ReceiverContext<Context>> {
@Override
public void onStart(ReceiverContext<Context> context) {
String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
// We're setting the value of the <foo> header as a low cardinality key value
context.addLowCardinalityKeyValue(KeyValue.of("foo", fooHeader));
}
@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof ReceiverContext;
}
}
考虑以下重用处理程序的 HTTP 服务器端检测:
// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.
// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that will reuse the <foo> header from the request as a
// low cardinality key value
registry.observationConfig().observationHandler(new HeaderReadingHandler());
try (Javalin javalin = Javalin.create().before("/hello/{name}", ctx -> {
// We're creating the special RequestReplyReceiverContext that will reuse the
// information from the HTTP headers
RequestReplyReceiverContext<Context, Context> receiverContext = new RequestReplyReceiverContext<>(
Context::header);
// Remember to set the carrier!!!
receiverContext.setCarrier(ctx);
String remoteServiceAddress = ctx.scheme() + "://" + ctx.host();
receiverContext.setRemoteServiceAddress(remoteServiceAddress);
// We're starting an Observation with the context
Observation observation = Observation
.createNotStarted("http.server.requests", () -> receiverContext, registry)
.contextualName("HTTP " + ctx.method() + " " + ctx.matchedPath())
.lowCardinalityKeyValue("http.url", remoteServiceAddress + ctx.matchedPath())
.highCardinalityKeyValue("http.full-url", remoteServiceAddress + ctx.path())
.lowCardinalityKeyValue("http.method", ctx.method().name())
.start();
// Let's be consistent and always set the Observation related objects under
// the same key
ctx.attribute(ObservationThreadLocalAccessor.KEY, observation);
}).get("/hello/{name}", ctx -> {
// We need to be thread-safe - we're not using ThreadLocals, we're retrieving
// information from the attributes
Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
observation.scoped(() -> {
// If we need thread locals (e.g. MDC entries) we can use <scoped()>
log.info("We're using scoped - Observation in thread local here [" + registry.getCurrentObservation()
+ "]");
then(registry.getCurrentObservation()).isNotNull();
});
// We're returning body
ctx.result("Hello World [" + observation.getContext().getLowCardinalityKeyValue("foo").getValue() + "]");
}).after("/hello/{name}", ctx -> {
// After sending the response we want to stop the Observation
Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
observation.stop();
}).start(0)) {
// We're sending an HTTP request with a <foo:bar> header. We're expecting that
// it will be reused in the response
String response = sendRequestToHelloEndpointWithHeader(javalin.port(), "foo", "bar");
// The response must contain the value from the header
then(response).isEqualTo("Hello World [bar]");
}
消息通信的仪表化
在本节中,您可以了解如何检测那些采用“发射后不管”(fire-and-forget)通信方式的库。
消息生产者端的仪表化
消息生产者端监控说明
┌─────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└────────┬────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
│ │ │ │ │ │ │
│ Wrap │ │ │ │ │ │
│──────────────>│ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ Create │ │ │ │
│ │ │───────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ Create │ │ │ │ │
│──────────────────────────────────────────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ │ │ Create │ │ │
│ │ │ │<────────────────────│ │ │
│ │ │ │ │ │ │
│ │ │ │ onStart │ │
│ │ │ │─────────────────────────────────────────────>│ │
│ │ │ │ │ │ │
│ │ │ │ │ Wrap in Scope │ │
│ │ │ │─────────────────────────────────────────────────────────────────────>│
┌────────┴────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
-
在 <3>
ObservationRegistry
中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingSenderTracingObservationHandler
) -
创建一个 <1>
SenderContext
,用于包装一个 <2> 载体(例如AmqpMessage
)-
在其构造函数中解释如何丰富头信息(例如
(key, value) → amqpMessage.header(key, value)
) -
在 <1>
SenderContext
上设置 <2> 载体
-
-
创建一个 <4>
Observation
,可选择使用 <5>ObservationConvention
与发送者上下文- 在 <4>
Observation
开始时,将通过 <6>ObservationHandler
进行传播(例如,载体将被适当的头信息丰富)
- 在 <4>
-
将 <7> 要检测的代码(例如发送 AMQP 消息)包装在作用域中(例如通过
observe
或scoped
方法)
消息消费者端通信的仪表化
消息消费端监控说明
┌───────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────┬─────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
│ │ │ │ │ │ │
│ Wrap │ │ │ │ │ │
│───────────────>│ │ │ │ │ │
│ │ │ │ │ │ │
│ │ │ Create │ │ │ │
│ │ │───────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ Create │ │ │ │ │
│───────────────────────────────────────────────────────>│ │ │ │
│ │ │ │ │ │ │
│ │ │ │ Create │ │ │
│ │ │ │<────────────────────│ │ │
│ │ │ │ │ │ │
│ │ │ │ onStart │ │
│ │ │ │─────────────────────────────────────────────>│ │
│ │ │ │ │ │ │
│ │ │ │ │ Wrap in Scope │ │
│ │ │ │─────────────────────────────────────────────────────────────────────>│
┌─────────┴─────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
-
在 <3>
ObservationRegistry
中注册一个 <6> 处理器,用于传播上下文(例如来自 Micrometer Tracing 的PropagatingReceiverTracingObservationHandler
) -
创建一个 <1>
ReceiverContext
,它包装了一个 <2> 载体(例如AmqpMessage
)-
在其构造函数中解释如何检索标头值(例如
(carrier, key) → carrier.header(key)
) -
在 <1>
ReceiverContext
上设置 <2> 载体
-
-
创建一个 <4>
Observation
,可选择使用 <6>ObservationConvention
与接收者上下文- 在 <4>
Observation
开始时,传播将通过 <6>ObservationHandler
进行(例如,载体将被适当的标头丰富)
- 在 <4>
-
将 <7> 要检测的代码(例如 AMQP 消息的处理)包装在作用域中(例如通过
observe
或scoped
方法)- 对于某些库(例如 RabbitMQ),您可能无法控制用户的代码,并且可能需要用户允许框架启动消费者端 Observation 并打开其作用域(将值放入线程本地),并要求用户在代码中手动关闭作用域并停止 Observation!
消息通信示例的插装
为了对基于消息的通信进行监控,我们需要在生产者端和消费者端分别使用 SenderContext
和 ReceiverContext
。
在本节中,我们将为 Apache Kafka 创建一个简单的监控工具。
作为生产者端的一个示例,我们将使用一个处理程序,通过添加 foo:bar
标头来对消息进行标记(如果你的类路径上有 Micrometer Tracing,你可以重用 PropagatingSenderTracingObservationHandler
和 PropagatingReceiverTracingObservationHandler
来在网络上传播追踪上下文)。考虑以下 KafkaSenderContext
的示例:
static class KafkaSenderContext extends SenderContext<ProducerRecord<String, String>> {
public KafkaSenderContext(ProducerRecord<String, String> producerRecord) {
// We describe how the carrier will be mutated (we mutate headers)
super((carrier, key, value) -> carrier.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
setCarrier(producerRecord);
}
}
考虑以下上述处理程序的示例:
static class HeaderPropagatingHandler implements ObservationHandler<KafkaSenderContext> {
@Override
public void onStart(KafkaSenderContext context) {
context.getSetter().set(context.getCarrier(), "foo", "bar");
context.addLowCardinalityKeyValue(KeyValue.of("sent", "true"));
}
@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof KafkaSenderContext;
}
}
考虑以下代码,它是 Kafka 的 ProducerInterceptor
:
public class ProducerInterceptorConfig implements ProducerInterceptor<String, String> {
private ObservationRegistry observationRegistry;
private Observation observation;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// This code will be called before the message gets sent. We create
// a context and pass it to an Observation. Upon start, the handler will be called
// and the ProducerRecord will be mutated
KafkaSenderContext context = new KafkaSenderContext(record);
this.observation = Observation.start("kafka.send", () -> context, observationRegistry);
// We return the mutated carrier
return context.getCarrier();
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// Once the message got sent (with or without an exception) we attach an exception
// and stop the observation
this.observation.error(exception);
this.observation.stop();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
// We retrieve the ObservationRegistry from the configuration
this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
}
}
考虑以下生产者端检测代码,该代码重用了处理程序:
TestObservationRegistry registry = TestObservationRegistry.create();
// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request and will
// set the low cardinality key "sent" to "true".
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());
// Producer side...
Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
producerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
producerConfigs.put(ObservationRegistry.class.getName(), registry);
producerConfigs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.micrometer.docs.observation.messaging.ProducerInterceptorConfig");
Producer<String, String> producer = new KafkaProducer<>(producerConfigs, new StringSerializer(),
new StringSerializer());
// Producer sends a message
producer.send(new ProducerRecord<>(topic, "foo"));
producer.flush();
作为消费者端的一个示例,我们使用一个处理程序,通过添加 foo
低基数键来为 Observation
添加工具,该键的值是消息中匹配的路径。考虑以下 KafkaReceiverContext
的示例:
static class KafkaReceiverContext extends ReceiverContext<ConsumerRecords<String, String>> {
public KafkaReceiverContext(ConsumerRecords<String, String> consumerRecord) {
// We describe how to read entries from the carrier (we read headers)
super((carrier, key) -> {
// This is a very naive approach that takes the first ConsumerRecord
Header header = carrier.iterator().next().headers().lastHeader(key);
if (header != null) {
return new String(header.value());
}
return null;
});
setCarrier(consumerRecord);
}
}
考虑以下上述处理程序的示例。
static class HeaderReadingHandler implements ObservationHandler<KafkaReceiverContext> {
@Override
public void onStart(KafkaReceiverContext context) {
String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
// We're setting the value of the <foo> header as a low cardinality key value
context.addLowCardinalityKeyValue(KeyValue.of("received foo header", fooHeader));
}
@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof KafkaReceiverContext;
}
}
考虑以下代码,它是 Kafka 的 ConsumerInterceptor
:
public class ConsumerInterceptorConfig implements ConsumerInterceptor<String, String> {
private ObservationRegistry observationRegistry;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// We're creating the receiver context
KafkaReceiverContext context = new KafkaReceiverContext(records);
// Then, we're just starting and stopping the observation on the consumer side
Observation.start("kafka.receive", () -> context, observationRegistry).stop();
// We could put the Observation in scope so that the users can propagate it
// further on
return context.getCarrier();
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
// We retrieve the ObservationRegistry from the configuration
this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
}
}
考虑以下重复使用处理程序的消费者端插桩代码:
TestObservationRegistry registry = TestObservationRegistry.create();
// Consumer side...
// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that takes the "foo" header's value and sets it as a low
// cardinality key "received foo header"
registry.observationConfig().observationHandler(new HeaderReadingHandler());
Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
consumerConfigs.put(ObservationRegistry.class.getName(), registry);
consumerConfigs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"io.micrometer.docs.observation.messaging.ConsumerInterceptorConfig");
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(),
new StringDeserializer());
// Consumer scubscribes to the topic
consumer.subscribe(Collections.singletonList(topic));
// Consumer polls for a message
consumer.poll(Duration.ofMillis(1000));
让我们看一下在发送和接收消息之后的断言。我们应该有 2 个观察结果,1 个在消费者端,1 个在生产者端,每个观察结果都应该有适当的键值,如处理程序中所示。
assertThat(registry).hasObservationWithNameEqualTo("kafka.send")
.that()
.hasBeenStarted()
.hasBeenStopped()
.hasLowCardinalityKeyValue("sent", "true");
assertThat(registry).hasObservationWithNameEqualTo("kafka.receive")
.that()
.hasBeenStarted()
.hasBeenStopped()
.hasLowCardinalityKeyValue("received foo header", "bar");
线程切换组件的插装
我们可能想要围绕通过 Executor
提交的 Runnable
或 Callable
创建一个 Observation
。为了实现这一点,我们需要知道在父线程中是否存在一个 Observation
,新线程应该继续这个 Observation
,或者是否需要为其创建一个子 Observation
。
考虑以下示例:
// Example of an Executor Service
ExecutorService executor = Executors.newCachedThreadPool();
// This snippet shows an example of how to wrap in an observation code that would
// be executed in a separate thread
// Let's assume that we have a parent observation
Observation parent = Observation.createNotStarted("parent", registry);
// Observation is put in scope via the <observe()> method
Future<Boolean> child = parent.observe(() -> {
// [Thread 1] Current Observation is the same as <parent>
then(registry.getCurrentObservation()).isSameAs(parent);
// [Thread 1] We're wrapping the executor in a Context Propagating version.
// <ContextExecutorService> comes from Context Propagation library
return ContextExecutorService.wrap(executor).submit(() -> {
// [Thread 2] Current Observation is same as <parent> - context got
// propagated
then(registry.getCurrentObservation()).isSameAs(parent);
// Wraps the code that should be run in a separate thread in an
// observation
return Observation.createNotStarted("child", registry).observe(this::yourCodeToMeasure);
});
});
响应式库的插桩
在本节中,我们将讨论如何在 Observations 中包装 Reactive 库,以及如何使用 Reactor Context 安全地在线程之间传播 Observations。
对于 Reactor 3.5.3 及之后的版本
在 Reactor 3.5.3 版本中(通过这个 PR),添加了一个启用自动上下文传播的选项。要使用此功能,请确保至少使用以下项目的最低版本:
要使用该功能,请调用新的 Reactor 的 Hook 方法(例如,在你的 public static void main
方法中),如下所示:
Hooks.enableAutomaticContextPropagation();
这会自动包装 Reactor 的内部机制,以便在操作符、线程等之间传播上下文。不需要使用 tap
和 handle
或上下文传播 API。
考虑以下示例:
// This snippet shows an example of how to use the new Hook API with Reactor
Hooks.enableAutomaticContextPropagation();
// Starting from Micrometer 1.10.8 you need to set your registry on this singleton
// instance of OTLA
ObservationThreadLocalAccessor.getInstance().setObservationRegistry(registry);
// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);
// Now we put it in thread local
parent.scoped(() -> {
// Example of propagating whatever there was in thread local
Integer block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
log.info("Context Propagation happens - the <parent> observation gets propagated ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isSameAs(parent);
})
.flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
.transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
log.info("Context Propagation happens - the <parent> observation gets propagated ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isSameAs(parent);
}))
// Let's assume that we're modifying the context
.contextWrite(context -> context.put("foo", "bar"))
// Since we are NOT part of the Reactive Chain (e.g. this is not a
// WebFlux application)
// you MUST call <contextCapture> to capture all ThreadLocal values
// and store them in a Reactor Context.
// ----------------------
// If you were part of the
// Reactive Chain (e.g. returning Mono from endpoint)
// there is NO NEED to call <contextCapture>. If you need to propagate
// your e.g. Observation
// to the Publisher you just created (e.g. Mono or Flux) please
// consider adding it
// to the Reactor Context directly instead of opening an Observation
// scope and calling <contextCapture> (see example below).
.contextCapture()
.block();
// We're still using <parent> as current observation
then(registry.getCurrentObservation()).isSameAs(parent);
then(block).isEqualTo(2);
// Now, we want to create a child observation for a Reactor stream and put it
// to Reactor Context
// Automatically its parent will be <parent> observation since <parent> is in
// Thread Local
Observation child = Observation.start("child", registry);
block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
log.info(
"Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isSameAs(child);
})
.flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
.transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
log.info(
"Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isSameAs(child);
}))
// Remember to stop the child Observation!
.doFinally(signalType -> child.stop())
// When using Reactor we ALWAYS search for
// ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
// search for an Observation. You DON'T have to use <contextCapture>
// because
// you have manually provided the ThreadLocalAccessor key
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
.block();
// We're back to having <parent> as current observation
then(registry.getCurrentObservation()).isSameAs(parent);
then(block).isEqualTo(2);
});
// There should be no remaining observation
then(registry.getCurrentObservation()).isNull();
// We need to stop the parent
parent.stop();
如果该方法的性能不理想,请检查禁用钩子并显式使用 handle
或 tap
操作符是否能提高性能。
Reactor 3.5.3 之前
通过 Reactor 在 Flux 中传播元素的推荐方式不是通过 ThreadLocal
实例,而是通过 Reactor Context。然而,Reactor 提供了两个操作符:tap()
和 handle()
。如果 Micrometer Context Propagation 库在类路径中,这两个操作符会为你设置线程本地值。
考虑以下示例:
// This snippet shows an example of how to wrap code that is using Reactor
// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);
// We want to create a child observation for a Reactor stream
Observation child = Observation.start("child", registry)
// There's no thread local entry, so we will pass parent observation
// manually. If we put the Observation in scope we could then call
// <.contextCapture()> method from Reactor to capture all thread locals
// and store them in Reactor Context.
.parentObservation(parent);
Integer block = Mono.just(1)
// Example of not propagating context by default
.doOnNext(integer -> {
log.info(
"No context propagation happens by default in Reactor - there will be no Observation in thread local here ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isNull();
})
// Example of having entries in thread local for <tap()> operator
.tap(() -> new DefaultSignalListener<Integer>() {
@Override
public void doFirst() throws Throwable {
log.info("We're using tap() -> there will be Observation in thread local here ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isNotNull();
}
})
.flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
// Example of retrieving ThreadLocal entries via ReactorContext
.transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(contextView)) {
log.info(
"We're retrieving thread locals from Reactor Context - there will be Observation in thread local here ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isNotNull();
}
}))
// Example of having entries in thread local for <handle()> operator
.handle((BiConsumer<Integer, SynchronousSink<Integer>>) (integer, synchronousSink) -> {
log.info("We're using handle() -> There will be Observation in thread local here ["
+ registry.getCurrentObservation() + "]");
then(registry.getCurrentObservation()).isNotNull();
synchronousSink.next(integer);
})
// Remember to stop the child Observation!
.doFinally(signalType -> child.stop())
// When using Reactor we ALWAYS search for
// ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
// search for an Observation
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
// If there were ThreadLocal entries that are using Micrometer Context
// Propagation they would be caught here. All implementations of
// <ThreadLocalAccessor> will store their thread local entries under their
// keys in Reactor Context
.contextCapture()
.block();
// We didn't have any observations in thread local
then(registry.getCurrentObservation()).isNull();
// We need to stop the parent
parent.stop();
then(block).isEqualTo(2);