StreamsBuilderFactoryBean 配置器
通常需要自定义创建 KafkaStreams
对象的 StreamsBuilderFactoryBean
。基于 Spring Kafka 提供的底层支持,binder 允许你自定义 StreamsBuilderFactoryBean
。你可以使用 StreamsBuilderFactoryBeanConfigurer
来自定义 StreamsBuilderFactoryBean
本身。然后,一旦你通过这个配置器获得了对 StreamsBuilderFactoryBean
的访问权限,你就可以使用 KafkaStreamsCustomizer
来自定义相应的 KafkaStreams
。这两个自定义器都是 Spring for Apache Kafka 项目的一部分。
以下是使用 StreamsBuilderFactoryBeanConfigurer
的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上内容展示了你可以对 StreamsBuilderFactoryBean
进行自定义的一些操作。你基本上可以调用 StreamsBuilderFactoryBean
中任何可用的修改操作来自定义它。这个自定义器将在工厂 bean 启动之前由绑定器调用。
一旦你获得了对 StreamsBuilderFactoryBean
的访问权限,你也可以自定义底层的 KafkaStreams
对象。以下是一个实现此操作的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将在底层的 KafkaStreams
启动之前由 StreamsBuilderFactoryBeabn
调用。
在整个应用程序中只能有一个 StreamsBuilderFactoryBeanConfigurer
。那么我们如何处理多个 Kafka Streams 处理器,因为每个处理器都由单独的 StreamsBuilderFactoryBean
对象支持?在这种情况下,如果这些处理器需要不同的自定义配置,那么应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储
如上所述,binder 并未提供一种一流的方式来将全局状态存储注册为一项功能。为此,你需要通过 StreamsBuilderFactoryBeanConfigurer
使用自定义器。以下是具体实现方法。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
对 StreamsBuilder
的任何自定义都必须通过 KafkaStreamsInfrastructureCustomizer
来完成,如上所示。如果调用 StreamsBuilderFactoryBean#getObject()
来获取 StreamsBuilder
对象,可能无法正常工作,因为该 bean 可能正在初始化,从而导致一些循环依赖问题。
如果你有多个处理器,你需要通过使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean
对象,将全局状态存储附加到正确的 StreamsBuilder
上,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理器
在错误处理部分,我们指出绑定器并未提供一种一流的方式来处理生产异常。尽管如此,您仍然可以使用 StreamsBuilderFactoryBean
自定义器来注册生产异常处理器。请参阅下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次强调,如果你有多个处理器,你可能需要针对正确的 StreamsBuilderFactoryBean
进行适当的设置。你也可以使用配置属性来添加此类生产异常处理器(更多信息请参见下文),但如果你选择采用编程方式,这也是一个选项。