跳到主要内容

重试关键业务逻辑

DeepSeek V3 中英对照 Retrying critical business logic

在某些场景中,您可能希望重试对应用程序至关重要的业务逻辑部分。这可能包括对关系型数据库的外部调用,或者从 Kafka Streams 处理器调用 REST 端点。这些调用可能由于各种原因失败,例如网络问题或远程服务不可用。通常情况下,如果您能再次尝试这些操作,这些失败可能会自行解决。默认情况下,Kafka Streams binder 会为所有输入绑定创建 RetryTemplate bean。

如果函数具有以下签名,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
none

并且使用默认的绑定名称时,RetryTemplate 将会注册为 process-in-0-RetryTemplate。这是遵循绑定名称(process-in-0)后跟字面量 -RetryTemplate 的约定。在多个输入绑定的情况下,每个绑定都会有一个单独的 RetryTemplate bean。如果应用程序中存在自定义的 RetryTemplate bean,并通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供,那么它将优先于任何输入绑定级别的重试模板配置属性。

一旦从绑定中注入的 RetryTemplate 被注入到应用程序中,它就可以用于重试应用程序中的任何关键部分。以下是一个示例:

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}

@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}

@Override
public void close() {
}
});
}
none

或者你可以使用一个自定义的 RetryTemplate,如下所示。

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);

retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);

return retryTemplate;
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {

return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}

@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});

}

@Override
public void close() {
}
});
}
}
none

请注意,当重试次数用尽时,默认情况下会抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,可以向 execute 方法添加一个 RecoveryCallback。以下是一个示例。

retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
none

有关 RetryTemplate、重试策略、退避策略等的更多信息,请参考 Spring Retry 项目。