死信主题处理
启用 DLQ
要启用 DLQ(死信队列),基于 Kafka 绑定器的应用程序必须通过属性 spring.cloud.stream.bindings.<binding-name>.group
提供一个消费者组。匿名消费者组(即应用程序未显式提供组的场景)无法启用 DLQ 功能。
当一个应用程序想要将错误的记录发送到 DLQ(死信队列)主题时,该应用程序必须启用 DLQ 功能,因为默认情况下此功能是未启用的。要启用 DLQ,必须将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
设置为 true。
当启用 DLQ 时,如果在处理过程中发生错误,并且根据 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
属性重试次数耗尽后,该记录将被发送到 DLQ 主题。
默认情况下,max-attempts
属性设置为三。当 max-attempts
属性大于 1
,并且启用了 DLQ(死信队列)时,你会看到重试会遵循 max-attempts
属性。当未启用 DLQ 时(这是默认情况),max-attempts
属性不会对重试的处理方式产生任何影响。在这种情况下,重试将回退到 Spring for Apache Kafka 的容器默认值,即 10
次重试。如果应用程序希望在禁用 DLQ 时完全禁用重试,将 max-attempts
属性设置为 1
将不起作用。要在此情况下完全禁用重试,你需要提供一个 ListenerContainerCustomizer
,然后使用适当的 Backoff
设置。以下是一个示例。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
这样,默认的容器行为将被禁用,并且不会尝试重试。如上所述,当启用 DLQ 时,绑定器设置将具有优先权。
处理死信主题中的记录
由于框架无法预见用户希望如何处理死信消息,因此它没有提供任何标准机制来处理这些消息。如果死信的原因是暂时的,您可能希望将这些消息重新路由回原始主题。然而,如果问题是永久性的,这可能会导致无限循环。本主题中的 Spring Boot 示例应用程序展示了如何将这些消息重新路由回原始主题,但在尝试三次后会将它们移动到“停车场”主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream
应用程序。当 5 秒内没有收到消息时,它会退出。
示例假设原始目的地是 so8400out
,消费者组是 so8400
。
有几种策略需要考虑:
-
考虑仅在主应用程序未运行时执行重路由。否则,瞬态错误的重试次数会很快耗尽。
-
或者,使用两阶段方法:使用此应用程序将流量路由到第三个主题,再使用另一个应用程序从那里路由回主主题。
以下代码清单展示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}