跳到主要内容

Apache Kafka 指标

DeepSeek V3 中英对照 Kafka Apache Kafka Metrics

Apache Kafka 是一个开源分布式事件流平台,被成千上万的公司用于高性能数据管道、流分析、数据集成和关键任务应用。

在下面你可以找到一个如何使用 Micrometer 监控 Apache Kafka 的示例。

在消费者端设置 Apache Kafka 的监控工具。

// Setting up and binding instrumentation
Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(),
new StringDeserializer());

KafkaClientMetrics consumerKafkaMetrics = new KafkaClientMetrics(consumer);
consumerKafkaMetrics.bindTo(registry);
java

在生产者端设置 Apache Kafka 的监控。

// Setting up and binding instrumentation
Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(producerConfigs, new StringSerializer(),
new StringSerializer());

KafkaClientMetrics producerKafkaMetrics = new KafkaClientMetrics(producer);
producerKafkaMetrics.bindTo(registry);
java

使用 Kafka Streams 设置 Apache Kafka 的监控。

// Setting up, binding instrumentation and usage example
try (KafkaStreams kafkaStreams = createStreams()) {
metrics = new KafkaStreamsMetrics(kafkaStreams);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);
assertThat(registry.getMeters()).isNotEmpty()
.extracting(meter -> meter.getId().getName())
.allMatch(s -> s.startsWith(METRIC_NAME_PREFIX));
}
java