跳到主要内容

接收批量消息

DeepSeek V3 中英对照 Receiving Batched Messages

使用 RabbitMQ 绑定器时,消费者绑定处理两种类型的批次:

生产者创建的批次

通常情况下,如果生产者绑定设置了 batch-enabled=true(参见 Rabbit 生产者属性),或者消息是由 BatchingRabbitTemplate 创建的,批处理的元素将作为对监听器方法的单独调用返回。从版本 3.0 开始,如果 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true,任何此类批处理都可以作为 List<?> 呈现给监听器方法。

消费者端批处理

从 3.1 版本开始,消费者可以被配置为将多个入站消息组装成一个批次,并以转换后的有效载荷的 List<?> 形式呈现给应用程序。以下是一个简单的应用程序,展示了如何使用此技术:

spring.cloud.stream.bindings.input-in-0.group=someGroup

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
properties
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);

// ...

});
};
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}

public static class Thing {

private String field;

public Thing() {
}

public Thing(String field) {
this.field = field;
}

public String getField() {
return this.field;
}

public void setField(String field) {
this.field = field;
}

@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}

}

}
java
Received 2
Thing [field=value1]
Thing [field=value2]
none

一个批次中的消息数量由 batch-sizereceive-timeout 属性指定;如果在 receive-timeout 超时后没有新的消息,则会传递一个“短”批次。

important

消费者端批处理仅支持 container-type=simple(默认值)。

如果你想检查消费者端批量消息的头部信息,你应该消费 Message<List<?>>;头部信息是一个 List<Map<String, Object>>,存储在 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS 头部中,每个有效负载元素的头部信息位于相应的索引中。再次,这里是一个简单的示例:

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));

// ...

}
};
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}

public static class Thing {

private String field;

public Thing() {
}

public Thing(String field) {
this.field = field;
}

public String getfield() {
return this.field;
}

public void setfield(String field) {
this.field = field;
}

@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}

}

}
java
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2
none