跳到主要内容

自定义 Kafka Binder 健康指示器

DeepSeek V3 中英对照 Custom Kafka Binder Health Indicator

覆盖默认的 Kafka Binder 健康指示器

当 Spring Boot Actuator 在类路径上时,Kafka binder 会激活一个默认的健康指示器。这个健康指示器会检查 binder 的健康状态以及与 Kafka broker 的任何通信问题。如果应用程序希望禁用这个默认的健康检查实现并包含一个自定义实现,那么它可以为 KafkaBinderHealth 接口提供一个实现。KafkaBinderHealth 是一个标记接口,它继承自 HealthIndicator。在自定义实现中,必须为 health() 方法提供一个实现。自定义实现必须作为 bean 存在于应用程序配置中。当 binder 发现自定义实现时,它将使用该实现而不是默认实现。以下是一个应用程序中自定义实现 bean 的示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
none

自定义 Kafka Binder 健康检查示例

以下是编写自定义 Kafka 绑定器 HealthIndicator 的伪代码。在这个示例中,我们尝试通过首先检查集群连接性,然后再检查与主题相关的问题,来覆盖绑定器提供的 Kafka HealthIndicator

首先,我们需要创建一个自定义的 KafkaBinderHealth 接口实现。

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
@Value("${spring.cloud.bus.destination}")
private String topic;
private final AdminClient client;

public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
// More about configuring Kafka
// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
this.client = AdminClient.create(admin.getConfigurationProperties());
}

@Override
public Health health() {
if (!checkBrokersConnection()) {
logger.error("Error when connect brokers");
return Health.down().withDetail("BrokersConnectionError", "Error message").build();
}
if (!checkTopicConnection()) {
logger.error("Error when trying to connect with specific topic");
return Health.down().withDetail("TopicError", "Error message with topic name").build();
}
return Health.up().build();
}

public boolean checkBrokersConnection() {
// Your implementation
}

public boolean checkTopicConnection() {
// Your implementation
}
}
none

然后我们需要为自定义实现创建一个 bean。

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
return new KafkaBinderHealthImplementation(admin);
}
}
none