跳到主要内容

重试与死信处理

DeepSeek V3 中英对照 Retry and Dead Letter Processing

默认情况下,当你在消费者绑定中配置重试(例如 maxAttemts)和 enableDlq 时,这些功能在 binder 内执行,监听器容器或 Kafka 消费者不参与其中。

在某些情况下,将这种功能转移到监听器容器中是更可取的,例如:

  • 重试和延迟的总和将超过消费者的 max.poll.interval.ms 属性,可能导致分区重新平衡。

  • 您希望将死信发布到不同的 Kafka 集群。

  • 您希望为错误处理程序添加重试监听器。

  • …​

要将此功能从绑定器移动到容器中进行配置,需要定义一个类型为 ListenerContainerWithDlqAndRetryCustomizer@Bean。该接口包含以下方法:

/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);

/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
java

目标解析器和 BackOff 是根据绑定属性(如果已配置)创建的。KafkaTemplate 使用 spring.kafka…​. 属性中的配置。然后,您可以使用这些配置来创建自定义错误处理程序和死信发布器;例如:

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {

@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {

if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}

@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}

};
}
java

现在,只需要单个重试延迟大于消费者的 max.poll.interval.ms 属性即可。

当使用多个绑定器时,ListenerContainerWithDlqAndRetryCustomizer bean 会被 DefaultBinderFactory 覆盖。为了使该 bean 生效,你需要使用 BinderCustomizer 来设置容器定制器(参见 [binder-customizer]):

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
java