RabbitMQ 流插件的初始生产者支持
现在提供了对 RabbitMQ Stream 插件 的基本支持。要启用此功能,您必须将 spring-rabbit-stream
jar 添加到类路径中 - 它必须与 spring-amqp
和 spring-rabbit
的版本相同。
important
当您将 producerType
属性设置为 STREAM_SYNC
或 STREAM_ASYNC
时,上述描述的生产者属性将不被支持。
要将 binder 配置为使用流 ProducerType
,Spring Boot 将从应用程序属性中配置一个 Environment
@Bean
。你可以选择性地添加一个自定义器来定制消息处理器。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
请参考 RabbitMQ Stream Java Client 文档 以获取有关配置环境和生产者构建器的信息。
RabbitMQ 超级流的生产者支持
请参阅 Super Streams 以获取有关超级流的信息。
使用超级流(super streams)允许在每个分区的超级流上使用单一活动消费者进行自动扩展和缩减。通过 Spring Cloud Stream,你可以通过 AMQP 或使用流客户端发布到超级流。
important
超级流必须已经存在;生产者绑定不支持创建超级流。
通过 AMQP 发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
在使用流客户端时,如果您设置了 confirmAckChannel
,成功发送的消息副本将被发送到该通道。