StreamsBuilderFactoryBean 配置器
通常需要自定义创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。基于 Spring Kafka 提供的底层支持,binder 允许你自定义 StreamsBuilderFactoryBean。你可以使用 StreamsBuilderFactoryBeanConfigurer 来自定义 StreamsBuilderFactoryBean 本身。然后,一旦你通过这个配置器获得了对 StreamsBuilderFactoryBean 的访问权限,你就可以使用 KafkaStreamsCustomizer 来自定义相应的 KafkaStreams。这两个自定义器都是 Spring for Apache Kafka 项目的一部分。
以下是使用 StreamsBuilderFactoryBeanConfigurer 的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}
以上内容展示了你可以对 StreamsBuilderFactoryBean 进行自定义的一些操作。你基本上可以调用 StreamsBuilderFactoryBean 中任何可用的修改操作来自定义它。这个自定义器将在工厂 bean 启动之前由绑定器调用。
一旦你获得了对 StreamsBuilderFactoryBean 的访问权限,你也可以自定义底层的 KafkaStreams 对象。以下是一个实现此操作的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                });
            }
        });
    };
}
KafkaStreamsCustomizer 将在底层的 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 调用。
在整个应用程序中只能有一个 StreamsBuilderFactoryBeanConfigurer。那么我们如何处理多个 Kafka Streams 处理器,因为每个处理器都由单独的 StreamsBuilderFactoryBean 对象支持?在这种情况下,如果这些处理器需要不同的自定义配置,那么应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                    });
                }
            });
        }
    };
使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储
如上所述,binder 并未提供一种一流的方式来将全局状态存储注册为一项功能。为此,你需要通过 StreamsBuilderFactoryBeanConfigurer 使用自定义器。以下是具体实现方法。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {
        }
    };
}
对 StreamsBuilder 的任何自定义都必须通过 KafkaStreamsInfrastructureCustomizer 来完成,如上所示。如果调用 StreamsBuilderFactoryBean#getObject() 来获取 StreamsBuilder 对象,可能无法正常工作,因为该 bean 可能正在初始化,从而导致一些循环依赖问题。
如果你有多个处理器,你需要通过使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象,将全局状态存储附加到正确的 StreamsBuilder 上,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理器
在错误处理部分,我们指出绑定器并未提供一种一流的方式来处理生产异常。尽管如此,您仍然可以使用 StreamsBuilderFactoryBean 自定义器来注册生产异常处理器。请参阅下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}
再次强调,如果你有多个处理器,你可能需要针对正确的 StreamsBuilderFactoryBean 进行适当的设置。你也可以使用配置属性来添加此类生产异常处理器(更多信息请参见下文),但如果你选择采用编程方式,这也是一个选项。