跳到主要内容

初始消费者对 RabbitMQ 流插件的支持

DeepSeek V3 中英对照 Initial Consumer Support for the RabbitMQ Stream Plugin

现在提供了对 RabbitMQ Stream Plugin 的基本支持。要启用此功能,您必须将 spring-rabbit-stream jar 添加到类路径中——它必须与 spring-amqpspring-rabbit 的版本相同。

important

当您将 containerType 属性设置为 stream 时,上述描述的消费者属性不受支持;concurrency 仅支持超级流。每个绑定只能消费一个流队列。

要将绑定器配置为使用 containerType=stream,Spring Boot 会自动从应用程序属性中配置一个 Environment @Bean。你还可以选择性地添加一个自定义器来定制监听器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
java

传递给自定义器的 name 参数是 destination + '.' + group + '.container'

name()(用于偏移量跟踪的目的)被设置为绑定 destination + '.' + group。可以使用上面展示的 ConsumerCustomizer 来更改它。如果你决定使用手动偏移量跟踪,Context 可以作为消息头使用:

int count;

@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
java

请参阅 RabbitMQ Stream Java Client 文档 以获取有关配置环境和消费者构建器的信息。

RabbitMQ 超级流的消费者支持

请参阅 Super Streams 了解有关超级流的信息。

使用超级流(super streams)可以在每个分区的超级流上实现自动扩展和缩减,每个分区只有一个活跃的消费者。

配置示例:

@Bean
public Consumer<Thing> input() {
...
}
java
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
properties

框架将创建一个名为 super 的超级流,其中包含 9 个分区。最多可以部署此应用程序的 3 个实例。