发布者确认
有两种机制可以获取发布消息的结果;在每种情况下,连接工厂必须将 publisherConfirmType 设置为 ConfirmType.CORRELATED。其中一种“传统”机制是将 confirmAckChannel 设置为一个消息通道的 bean 名称,你可以从该通道异步获取确认信息;否定确认(negative acks)会被发送到错误通道(如果启用)——请参阅错误通道。
在 3.1 版本中新增的首选机制是使用关联数据头,并通过其 Future<Confirm> 属性等待结果。这在批量监听器中特别有用,因为您可以在等待结果之前发送多条消息。要使用此技术,请将 useConfirmHeader 属性设置为 true。以下是一个使用此技术的简单应用程序示例:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
	private static final Logger log = LoggerFactory.getLogger(Application.class);
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Autowired
	private StreamBridge bridge;
	@Bean
	Consumer<List<String>> input() {
		return list -> {
			List<MyCorrelationData> results = new ArrayList<>();
			list.forEach(str -> {
				log.info("Received: " + str);
				MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
				results.add(corr);
				this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
						.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
						.build());
			});
			results.forEach(correlation -> {
				try {
					Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
					log.info(confirm + " for " + correlation.getPayload());
					if (correlation.getReturnedMessage() != null) {
						log.error("Message for " + correlation.getPayload() + " was returned ");
						// throw some exception to invoke binder retry/error handling
					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new IllegalStateException(e);
				}
				catch (ExecutionException | TimeoutException e) {
					throw new IllegalStateException(e);
				}
			});
		};
	}
	@Bean
	public ApplicationRunner runner(BatchingRabbitTemplate template) {
		return args -> IntStream.range(0, 10).forEach(i ->
				template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
	}
	@Bean
	public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
		BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
		return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
	}
}
class MyCorrelationData extends CorrelationData {
	private final String payload;
	MyCorrelationData(String id, String payload) {
		super(id);
		this.payload = payload;
	}
	public String getPayload() {
		return this.payload;
	}
}
如你所见,我们发送每条消息后,都会等待发布结果。如果消息无法路由,则在 future 完成之前,返回的消息会填充到关联数据中。
important
关联数据必须提供一个唯一的 id,以便框架能够执行关联。
你不能同时设置 useConfirmHeader 和 confirmAckChannel,但当 useConfirmHeader 为 true 时,你仍然可以在错误通道中接收到返回的消息,不过使用关联头(correlation header)会更加方便。