跳到主要内容

消费批次

DeepSeek V3 中英对照 Consuming Batches

从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,通过轮询 Kafka Consumer 接收到的所有记录将以 List<?> 的形式传递给监听器方法。否则,方法将一次处理一条记录。批处理的大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;更多信息请参考 Kafka 文档。

在接收批次时,允许以下类型签名:

List<Person>
Message<List<Person>>
none

List<Person> 的第一种选项中,监听器不会收到任何消息头。如果使用第二种类型签名(Message<List<Person>>),则可以访问消息头;然而,所有的消息头仍然是以 Collection 的形式存在。我们来看以下示例。

假设 Message 包含一个包含十个 Person 对象的列表。MessageMessageHeaders 包含一个头部信息的映射,其中键为头部名称,值为一个列表。该列表包含该头部的值,顺序与有效负载列表相同。因此,应用程序需要根据有效负载列表的迭代从 MessageHeaders 映射中正确访问头部信息。

请注意,在批量模式下使用时,不允许使用 List<Message<Person>> 这种形式的类型签名。

从版本 4.0.2 开始,binder 在批量模式下消费时支持 DLQ(Dead Letter Queue,死信队列)功能。需要注意的是,当在批量模式的消费者绑定上使用 DLQ 时,从前一次轮询中接收到的所有记录都将被发送到 DLQ 主题。

important

在使用批处理模式时,绑定器内部不支持重试,因此 maxAttempts 将被覆盖为 1。你可以通过配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)来实现类似于绑定器中的重试功能。你也可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批处理的偏移量,并使剩余记录重新投递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档

备注

在批处理模式下接收 KafkaNull 对象时,接收到的列表将包含对应 KafkaNull 对象的空元素。无论是 List<Person> 还是 Message<List<Person>> 类型的签名,都是如此。

批量模式下的可观测性

在批量消费记录时,观察追踪传播功能并不直接支持。这是因为 Kafka binder 使用的 Spring for Apache Kafka 库不支持在批量监听器上进行追踪;它仅支持记录监听器。在批量监听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加追踪信息是可选的。由于批次中的记录之间可能没有任何关联,框架无法对它们进行追踪的假设,例如为它们提供单一的追踪 ID 等。如果你使用 Message<List<String>> 的类型签名,你可以获取一个名为 kafka_batchConvertedHeaders 的头部,其中包含一个与你的负载条目数量相同的列表。这个列表中包含一个 Map,其中存储了追踪头部。然而,应用程序需要正确地遍历这些内容并启动观察。