消费批次
从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为 true
时,通过轮询 Kafka Consumer
接收到的所有记录将以 List<?>
的形式传递给监听器方法。否则,方法将一次处理一条记录。批处理的大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;更多信息请参考 Kafka 文档。
在接收批次时,允许以下类型签名:
List<Person>
Message<List<Person>>
在 List<Person>
的第一种选项中,监听器不会收到任何消息头。如果使用第二种类型签名(Message<List<Person>>
),则可以访问消息头;然而,所有的消息头仍然是以 Collection
的形式存在。我们来看以下示例。
假设 Message
包含一个包含十个 Person
对象的列表。Message
的 MessageHeaders
包含一个头部信息的映射,其中键为头部名称,值为一个列表。该列表包含该头部的值,顺序与有效负载列表相同。因此,应用程序需要根据有效负载列表的迭代从 MessageHeaders
映射中正确访问头部信息。
请注意,在批量模式下使用时,不允许使用 List<Message<Person>>
这种形式的类型签名。
从版本 4.0.2
开始,binder 在批量模式下消费时支持 DLQ(Dead Letter Queue,死信队列)功能。需要注意的是,当在批量模式的消费者绑定上使用 DLQ 时,从前一次轮询中接收到的所有记录都将被发送到 DLQ 主题。
在使用批处理模式时,绑定器内部不支持重试,因此 maxAttempts
将被覆盖为 1。你可以通过配置 DefaultErrorHandler
(使用 ListenerContainerCustomizer
)来实现类似于绑定器中的重试功能。你也可以使用手动 AckMode
并调用 Ackowledgment.nack(index, sleep)
来提交部分批处理的偏移量,并使剩余记录重新投递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。
在批处理模式下接收 KafkaNull
对象时,接收到的列表将包含对应 KafkaNull
对象的空元素。无论是 List<Person>
还是 Message<List<Person>>
类型的签名,都是如此。
批量模式下的可观测性
在批量消费记录时,观察追踪传播功能并不直接支持。这是因为 Kafka binder 使用的 Spring for Apache Kafka 库不支持在批量监听器上进行追踪;它仅支持记录监听器。在批量监听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加追踪信息是可选的。由于批次中的记录之间可能没有任何关联,框架无法对它们进行追踪的假设,例如为它们提供单一的追踪 ID 等。如果你使用 Message<List<String>>
的类型签名,你可以获取一个名为 kafka_batchConvertedHeaders
的头部,其中包含一个与你的负载条目数量相同的列表。这个列表中包含一个 Map
,其中存储了追踪头部。然而,应用程序需要正确地遍历这些内容并启动观察。