跳到主要内容

消费记录

DeepSeek V3 中英对照 Consuming Records

在上面的 uppercase 函数中,我们将记录作为 Flux<String> 进行消费,然后将其作为 Flux<String> 生成。在某些情况下,你可能需要以原始接收格式接收记录——即 ReceiverRecord。以下是这样一个函数。

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
java

在这个函数中,请注意,我们将记录作为 Flux<ReceiverRecord<byte[], byte[]>> 消费,然后将其作为 Flux<String> 生成。ReceiverRecord 是基本的接收记录,它是 Reactor Kafka 中的一个专门的 Kafka ConsumerRecord。当使用响应式 Kafka 绑定时,上述函数将允许你访问每个传入记录的 ReceiverRecord 类型。然而,在这种情况下,你需要为 RecordMessageConverter 提供一个自定义实现。默认情况下,响应式 Kafka 绑定器使用 MessagingMessageConverter,它从 ConsumerRecord 中转换有效载荷和头信息。因此,当你的处理方法接收到它时,有效载荷已经从接收到的记录中提取出来,并像我们上面看到的第一个函数那样传递给方法。通过在应用程序中提供自定义的 RecordMessageConverter 实现,你可以覆盖默认行为。例如,如果你想将记录作为原始的 Flux<ReceiverRecord<byte[], byte[]>> 消费,那么你可以在应用程序中提供以下 bean 定义。

@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {

private final RecordMessageConverter converter = new MessagingMessageConverter();

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}

@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}

};
}
java

然后,你需要指示框架在所需的绑定中使用这个转换器。以下是一个基于我们的 lowercase 函数的示例。

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
none

lowercase-in-0 是我们的 lowercase 函数的输入绑定名称。对于输出(lowercase-out-0),我们仍然使用常规的 MessagingMessageConverter

在上面的 toMessage 实现中,我们接收原始的 ConsumerRecord(由于我们处于反应式绑定器上下文中,因此是 ReceiverRecord),然后将其包装在 Message 中。然后,将包含 ReceiverRecord 的消息有效载荷提供给用户方法。

如果 reactiveAutoCommitfalse(默认值),调用 rec.receiverOffset().acknowledge()(或 commit())来提交偏移量;如果 reactiveAutoCommittrue,则 flux 会提供 ConsumerRecord。更多信息请参考 reactor-kafka 文档和 javadocs。