RabbitMQ 绑定器参考指南
本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。它包含了关于其设计、使用和配置选项的信息,以及关于 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。
用法
要使用 RabbitMQ 绑定器,你可以通过以下 Maven 坐标将其添加到你的 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,你可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
RabbitMQ Binder 概述
以下简化图表展示了 RabbitMQ 绑定器的工作方式:
默认情况下,RabbitMQ Binder 实现将每个目标映射到一个 TopicExchange
。对于每个消费者组,一个 Queue
被绑定到该 TopicExchange
。每个消费者实例为其组的 Queue
都有一个对应的 RabbitMQ Consumer
实例。对于分区生产者和消费者,队列名称会附加分区索引,并使用分区索引作为路由键。对于匿名消费者(没有 group
属性的消费者),使用一个自动删除的队列(具有随机生成的唯一名称)。
通过使用可选的 autoBindDlq
选项,您可以配置绑定器以创建和配置死信队列(DLQs)(以及死信交换器 DLX
和路由基础设施)。默认情况下,死信队列的名称为目标名称,后缀为 .dlq
。如果启用了重试(maxAttempts > 1
),则在重试次数用尽后,失败的消息将被传递到 DLQ。如果禁用了重试(maxAttempts = 1
),您应将 requeueRejected
设置为 false
(默认值),以便将失败的消息路由到 DLQ,而不是重新入队。此外,republishToDlq
会使绑定器将失败的消息发布到 DLQ(而不是拒绝它)。此功能允许在消息头中添加其他信息(例如 x-exception-stacktrace
头中的堆栈跟踪)。有关截断堆栈跟踪的信息,请参阅 frameMaxHeadroom 属性。此选项不需要启用重试。您可以在仅一次尝试后重新发布失败的消息。从 1.2 版本开始,您可以配置重新发布消息的传递模式。请参阅 republishDeliveryMode 属性。
如果流监听器抛出 ImmediateAcknowledgeAmqpException
,则死信队列(DLQ)将被绕过,消息将被直接丢弃。从版本 2.1 开始,无论 republishToDlq
的设置如何,都是如此;在此之前,只有在 republishToDlq
为 false
时才会发生这种情况。
将 requeueRejected
设置为 true
(同时 republishToDlq=false
)会导致消息不断重新入队并重新投递,除非失败的原因是暂时的,否则这通常不是你想要的结果。通常情况下,你应该通过在绑定器中启用重试机制来解决问题,即将 maxAttempts
设置为大于 1 的值,或者将 republishToDlq
设置为 true
。
从 3.1.2 版本开始,如果消费者被标记为 transacted
,发布到死信队列(DLQ)将会参与事务。这意味着如果由于某种原因(例如,用户未被授权发布到死信交换器)发布失败,事务将会回滚。此外,如果连接工厂配置为发布者确认或返回,发布到 DLQ 的操作将会等待确认并检查返回的消息。如果收到否定确认或返回的消息,绑定器将抛出 AmqpRejectAndDontRequeueException
,允许代理负责发布到 DLQ,就像 republishToDlq
属性为 false
时一样。
有关这些属性的更多信息,请参阅 RabbitMQ 绑定器属性。
该框架没有提供任何标准机制来消费死信消息(或将它们重新路由回主队列)。一些选项在死信队列处理中进行了描述。
在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定器时,重要的是要禁用 RabbitAutoConfiguration
,以避免 RabbitAutoConfiguration
中的相同配置被应用到两个绑定器上。你可以通过使用 @SpringBootApplication
注解来排除该类。
从 2.0 版本开始,RabbitMessageChannelBinder
将 RabbitTemplate.userPublisherConnection
属性设置为 true
,以便非事务性生产者避免消费者出现死锁,这种情况可能会发生在由于代理上的内存警报导致缓存连接被阻塞时。
目前,multiplex
消费者(一个消费者监听多个队列)仅支持消息驱动消费者;轮询消费者只能从单个队列中检索消息。
配置选项
本节包含特定于 RabbitMQ 绑定器和绑定通道的设置。
有关通用绑定配置选项和属性的详细信息,请参阅 Spring Cloud Stream 核心文档。
章节总结
🗃️ 配置选项
9 个项目
📄️ 使用现有的队列/交换机
默认情况下,binder 会自动配置一个主题交换器(topic exchange),其名称由目标绑定属性 \<prefix>\<destination> 的值派生而来。如果未提供目标绑定属性,则目标默认为绑定名称。当绑定消费者时,将自动配置一个队列,队列名称为 \<prefix>\<destination>.\<group>(如果指定了组绑定属性),或者在没有组的情况下配置一个匿名的、自动删除的队列。该队列将使用“匹配所有”通配符路由键(#)与非分区绑定进行绑定,或使用 \<destination>-\<instanceIndex> 与分区绑定进行绑定。默认情况下,前缀为空字符串。如果输出绑定指定了 requiredGroups,将为每个组配置一个队列/绑定。
🗃️ 使用 RabbitMQ Binder 重试
1 个项目
📄️ 错误通道
从 1.3 版本开始,binder 会无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败的情况发送到错误通道。有关更多信息,请参阅“错误处理”部分。
📄️ 使用 RabbitMQ Binder 进行分区
RabbitMQ 本身不支持分区。
📄️ Rabbit Binder 健康指标
Rabbit 绑定器的健康指示器委托给 Spring Boot 提供的健康指示器。有关更多信息,请参阅此处。