Kafka Binder 监听器容器自定义器
Spring Cloud Stream 通过使用自定义器为消息监听器容器提供了强大的定制选项。本节涵盖了适用于 Kafka 的自定义器接口:ListenerContainerCustomizer、其 Kafka 特定的扩展 KafkaListenerContainerCustomizer,以及专门的 ListenerContainerWithDlqAndRetryCustomizer。
ListenerContainerCustomizer
ListenerContainerCustomizer 是 Spring Cloud Stream 中的一个通用接口,它允许对消息监听容器进行自定义。
目的
当您需要修改监听器容器的行为时,请使用此自定义工具。
用法
要使用 ListenerContainerCustomizer,请在您的配置中创建一个实现此接口的 bean:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}
ListenerContainerCustomizer 接口定义了以下方法:
void configure(C container, String destinationName, String group);
- 
container: 要自定义的消息监听器容器。
- 
destinationName: 目标(主题)的名称。
- 
group: 消费者组 ID。
KafkaListenerContainerCustomizer
KafkaListenerContainerCustomizer 接口扩展了 ListenerContainerCustomizer,用于修改监听器容器的行为,并提供对特定绑定的扩展 Kafka 消费者属性的访问。
目的
在自定义监听器容器时,如果需要访问绑定特定的扩展 Kafka 消费者属性,请使用此自定义器。
用法
要使用 KafkaListenerContainerCustomizer,请在你的配置中创建一个实现此接口的 bean:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}
KafkaListenerContainerCustomizer 接口添加了以下方法:
default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}
此方法通过添加一个额外的参数来扩展基础的 configure 方法:
- extendedConsumerProperties: 扩展的消费者属性,包括 Kafka 特定的属性。
ListenerContainerWithDlqAndRetryCustomizer
ListenerContainerWithDlqAndRetryCustomizer 接口为涉及死信队列(DLQ)和重试机制的场景提供了额外的自定义选项。
目的
在需要微调 DLQ 行为或为 Kafka 消费者实现自定义重试逻辑时,请使用此自定义器。
用法
要使用 ListenerContainerWithDlqAndRetryCustomizer,请在你的配置中创建一个实现此接口的 bean:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}
ListenerContainerWithDlqAndRetryCustomizer 接口定义了以下方法:
void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
- 
container: 要自定义的 Kafka 监听器容器。
- 
destinationName: 目标(主题)的名称。
- 
group: 消费者组 ID。
- 
dlqDestinationResolver: 用于解析失败记录的 DLQ 目标的函数。
- 
backOff: 重试的回退策略。
- 
extendedConsumerProperties: 扩展的消费者属性,包括 Kafka 特定的属性。
摘要
- 
如果启用了 DLQ,则使用 ListenerContainerWithDlqAndRetryCustomizer。
- 
对于没有 DLQ 的 Kafka 特定定制,使用 KafkaListenerContainerCustomizer。
- 
基础 ListenerContainerCustomizer用于通用定制。
这种分层方法允许在 Spring Cloud Stream 应用程序中对 Kafka 监听器容器进行灵活且具体的自定义。