错误处理
Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。有关此支持的详细信息,请参阅此链接。开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理器 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顾名思义,前者会记录错误并继续处理下一条记录,而后者会记录错误并终止处理。LogAndFailExceptionHandler
是默认的反序列化异常处理器。
处理绑定器中的反序列化异常
Kafka Streams binder 允许使用以下属性来指定上述的反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两种反序列化异常处理器外,binder 还提供了第三种处理器,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理器的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当上述属性设置后,反序列化错误中的所有记录将自动发送到 DLQ 主题。
你可以设置 DLQ 消息发布到的主题名称,如下所示。
你可以为 DlqDestinationResolver
提供一个实现,它是一个函数式接口。DlqDestinationResolver
接收 ConsumerRecord
和异常作为输入,然后允许指定一个主题名称作为输出。通过访问 Kafka 的 ConsumerRecord
,可以在 BiFunction
的实现中对头记录进行内省。
以下是提供 DlqDestinationResolver
实现的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 DlqDestinationResolver
提供实现时,需要记住的一个重要事项是,binder 中的 provisioner 不会为应用程序自动创建主题。这是因为 binder 无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果使用此策略提供 DLQ 名称,应用程序有责任确保这些主题事先已创建。
如果应用程序中存在 DlqDestinationResolver
作为 bean,它将具有更高的优先级。如果你不想采用这种方法,而是希望通过配置提供静态的 DLQ 名称,可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此选项,则错误记录将被发送到主题 custom-dlq
。如果应用程序未使用上述任一策略,则将创建一个名为 error.<input-topic-name>.<application-id>
的 DLQ 主题。例如,如果你的绑定目标主题是 inputTopic
,且应用程序 ID 是 process-applicationId
,那么默认的 DLQ 主题将是 error.inputTopic.process-applicationId
。如果你打算启用 DLQ,始终建议为每个输入绑定显式创建一个 DLQ 主题。
每个输入消费者绑定的 DLQ
属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着如果同一个应用程序中有多个函数,此属性将应用于所有函数。然而,如果你在一个处理器中有多个处理器或多个输入绑定,那么你可以使用绑定器为每个输入消费者绑定提供的更细粒度的 DLQ 控制。
如果你有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
如果你只想在第一个输入绑定上启用 DLQ,并在第二个绑定上启用 skipAndContinue
,那么你可以在消费者中按如下方式配置。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
通过这种方式设置反序列化异常处理程序的优先级高于在绑定器级别设置。
DLQ 分区
默认情况下,记录会使用与原始记录相同的分区发布到死信主题。这意味着死信主题的分区数量必须至少与原始记录的分区数量相同。
要更改此行为,请将一个 DlqPartitionFunction
实现作为 @Bean
添加到应用程序上下文中。只能存在一个这样的 bean。该函数会接收到消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord
和异常。例如,如果你总是希望路由到分区 0,可以使用以下代码:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将消费者绑定的 dlqPartitions
属性设置为 1(且绑定器的 minPartitionCount
等于 1
),则无需提供 DlqPartitionFunction
;框架将始终使用分区 0。如果将消费者绑定的 dlqPartitions
属性设置为大于 1
的值(或绑定器的 minPartitionCount
大于 1
),则必须提供一个 DlqPartitionFunction
bean,即使分区数与原始主题的分区数相同。
在使用 Kafka Streams 绑定器的异常处理功能时,需要注意以下几点。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着如果同一个应用程序中有多个函数,该属性将应用于所有函数。 -
反序列化的异常处理与原生反序列化和框架提供的消息转换保持一致。
处理 Binder 中的生产异常
与上述反序列化异常处理程序的支持不同,绑定器并不提供处理生产异常的一流机制。然而,您仍然可以通过使用 StreamsBuilderFactoryBean
自定义器来配置生产异常处理程序,您可以在下面的后续部分中找到更多详细信息。
运行时错误处理
在处理来自应用程序代码(即业务逻辑执行)的错误时,通常由应用程序自行处理。因为 Kafka Streams 绑定器无法干预应用程序代码。然而,为了让应用程序处理起来更加方便,绑定器提供了一个便捷的 RecordRecoverableProcessor
,通过它,你可以指定如何处理应用程序级别的错误。
考虑以下代码。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果你在上面的 map
调用中的业务代码抛出异常,你有责任处理该错误。这就是 RecordRecoverableProcessor
派上用场的地方。默认情况下,RecordRecoverableProcessor
会简单地记录错误并让应用程序继续运行。假设你想将失败的记录发布到 DLT(Dead Letter Topic),而不是在应用程序内部处理它。在这种情况下,你必须使用一个名为 DltAwareProcessor
的自定义 RecordRecoverableProcessor
实现。以下是你可以如何做到这一点。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map
调用中的业务逻辑代码现在已被移至 KStream#process
方法调用中,该方法接收一个 ProcessorSupplier
。然后,我们传入自定义的 DltAwareProcessor
,它能够发布到 DLT(Dead Letter Topic)。上面的 DltAwareProcessor
构造函数接收三个参数——一个 Function
,它接收输入记录并将业务逻辑操作作为 Function
主体的一部分,DLT 主题,以及最后的 DltPublishingContext
。当 Function
的 lambda 表达式抛出异常时,DltAwareProcessor
会将输入记录发送到 DLT。DltPublishingContext
为 DltAwareProcessor
提供了必要的发布基础设施 bean。DltPublishingContext
由 binder 自动配置,因此你可以直接将其注入到应用程序中。
如果你不希望绑定器将失败的记录发布到 DLT(Dead Letter Topic),那么你必须直接使用 RecordRecoverableProcessor
,而不是 DltAwareProcessor
。你可以提供一个自定义的恢复器作为 BiConsumer
,它接受输入的 Record
和异常作为参数。假设一个场景,你不想将记录发送到 DLT,而是简单地记录日志并继续处理。下面是一个示例,展示如何实现这一点。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在这种情况下,当记录失败时,RecordRecoverableProcessor
会使用用户提供的恢复器,该恢复器是一个 BiConsumer
,它接受失败的记录和抛出的异常作为参数。
在 DltAwareProcessor 中处理记录键
当使用 DltAwareProcessor
将失败记录发送到 DLT 时,如果你想将记录键发送到 DLT 主题,那么你需要在 DLT 绑定上设置适当的序列化器。这是因为,DltAwareProcessor
使用 StreamBridge
,而 StreamBridge
使用常规的 Kafka 绑定器(基于消息通道),默认情况下它使用 ByteArraySerializer
来序列化键。对于记录值,Spring Cloud Stream 会将有效负载转换为适当的 byte[]
;然而,对于键则不是这样,因为它只是将接收到的键头信息原样传递。如果你提供了一个非字节数组的键,那么这可能会导致类转换异常,为了避免这种情况,你需要在 DLT 绑定上设置一个序列化器,如下所示。
假设 DLT 目标为 hello-dlt-1
,并且记录键的数据类型为 String。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer