出站分区支持
一个 Kafka Streams 处理器通常会将处理后的输出发送到一个出站 Kafka 主题中。如果出站主题是分区的,并且处理器需要将输出数据发送到特定的分区中,应用程序需要提供一个类型为 StreamPartitioner
的 bean。有关更多详细信息,请参阅 StreamPartitioner。让我们来看一些示例。
这是我们之前多次看到的同一处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定的目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic
有 4 个分区,如果你不提供分区策略,Kafka Streams 将使用默认的分区策略,这可能不是你想要的,具体取决于特定的用例。假设你想将任何匹配 spring
的键发送到分区 0,cloud
发送到分区 1,stream
发送到分区 2,其他所有内容发送到分区 3。这就是你在应用程序中需要做的。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,然而,你可以访问记录中的键/值、主题名称以及总的分区数量。因此,如果需要,你可以实现复杂的分区策略。
你还需要提供这个 bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。