跳到主要内容

使用 RabbitMQ Binder 进行分区

DeepSeek V3 中英对照 Partitioning with the RabbitMQ Binder

RabbitMQ 本身不支持分区。

有时,将数据发送到特定分区是有利的——例如,当您希望严格排序消息处理时,特定客户的所有消息都应发送到同一分区。

RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区功能。

以下 Java 和 YAML 示例展示了如何配置生产者:

@SpringBootApplication
public class RabbitPartitionProducerApplication {

private static final Random RANDOM = new Random(System.currentTimeMillis());

private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};

public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}

@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}

}
java
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
yaml
备注

前面示例中的配置使用了默认的分区策略(key.hashCode() % partitionCount)。根据键值的不同,这可能提供也可能不提供合适的平衡算法。您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认行为。

required-groups 属性仅在需要在部署生产者时配置消费者队列时才需要。否则,发送到分区的任何消息都会丢失,直到部署相应的消费者为止。

以下配置用于提供一个主题交换器:

部分交换

以下队列绑定到该交换器:

部分队列

以下绑定将队列与交换机关联:

部分绑定

以下 Java 和 YAML 示例延续了之前的示例,展示了如何配置消费者:

@SpringBootApplication
public class RabbitPartitionConsumerApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}

@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}

}
java
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
yaml
important

RabbitMessageChannelBinder 不支持动态扩展。每个分区必须至少有一个消费者。消费者的 instanceIndex 用于指示消费哪个分区。像 Cloud Foundry 这样的平台只能有一个具有 instanceIndex 的实例。