跳到主要内容

技巧、窍门与配方

DeepSeek V3 中英对照 Tips, Tricks and Recipes

简单的 Kafka DLQ

问题陈述

作为一名开发者,我希望编写一个消费应用程序,用于处理来自 Kafka 主题的记录。然而,如果在处理过程中出现某些错误,我不希望应用程序完全停止。相反,我希望将出错的记录发送到 DLT(死信主题),然后继续处理新的记录。

解决方案

这个问题的解决方案是使用 Spring Cloud Stream 中的 DLQ(死信队列)功能。为了便于讨论,我们假设以下代码是我们的处理器函数。

@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
none

这是一个非常简单的函数,它会为所有处理的记录抛出异常,但你可以将这个函数扩展应用到其他类似的情况中。

为了将错误的记录发送到 DLT,我们需要提供以下配置。

spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
none

为了激活 DLQ,应用程序必须提供组名。匿名消费者无法使用 DLQ 功能。我们还需要通过在 Kafka 消费者绑定上将 enableDLQ 属性设置为 true 来启用 DLQ。最后,我们可以选择通过在 Kafka 消费者绑定上提供 dlqName 来指定 DLT 名称,否则在这种情况下默认值为 error.input-topic.my-group

请注意,在上面提供的消费者示例中,有效负载的类型是 byte[]。默认情况下,Kafka binder 中的 DLQ 生产者期望有效负载的类型为 byte[]。如果不是这种情况,那么我们需要提供适当的序列化器配置。例如,让我们将消费者函数重写如下:

@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
none

现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时我们如何对数据进行序列化。以下是针对此场景修改后的配置:

spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
none

带有高级重试选项的 DLQ

问题陈述

这与上面的方法类似,但作为开发者,我希望能够配置重试的处理方式。

解决方案

如果您按照上述步骤操作,当处理过程中遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。

默认情况下,binder 最多会尝试 3 次重试,初始延迟为 1 秒,每次退避的乘数为 2.0,最大延迟为 10 秒。你可以按照以下方式更改所有这些配置:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
none

如果你愿意,也可以通过提供一个布尔值映射来提供可重试异常的列表。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
none

默认情况下,上述映射中未列出的任何异常都将被重试。如果不希望这样,你可以通过提供以下内容来禁用该行为,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
none

你也可以提供自己的 RetryTemplate,并将其标记为 @StreamRetryTemplate,这样它将被绑定器扫描并使用。当你希望使用更复杂的重试策略和策略时,这非常有用。

如果你有多个 @StreamRetryTemplate bean,那么你可以通过使用以下属性来指定你的绑定希望使用哪一个:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
none

使用 DLQ 处理反序列化错误

问题陈述

我有一个处理器在 Kafka 消费者中遇到了反序列化异常。我原本期望 Spring Cloud Stream 的 DLQ(死信队列)机制会捕获这种情况,但它并没有。我该如何处理这个问题呢?

解决方案

Spring Cloud Stream 提供的正常 DLQ(死信队列)机制在 Kafka 消费者抛出不可恢复的反序列化异常时无法提供帮助。这是因为这种异常甚至在消费者的 poll() 方法返回之前就已经发生了。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 binder 处理这种情况。让我们来探讨一下这些方法。

假设这是我们的函数:

@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
none

这是一个简单的函数,它接收一个 String 类型的参数。

我们希望绕过 Spring Cloud Stream 提供的消息转换器,转而使用原生的反序列化器。对于 String 类型来说,这样做可能没有太大意义,但对于更复杂的类型(如 AVRO 等),你必须依赖外部的反序列化器,因此希望将转换委托给 Kafka。

现在当消费者接收到数据时,假设有一个坏记录导致了反序列化错误,例如,有人传递了一个 Integer 而不是 String。在这种情况下,如果你不在应用程序中做些什么,异常将会通过链传播,最终导致你的应用程序退出。

为了处理这个问题,你可以添加一个 ListenerContainerCustomizer @Bean,它用于配置一个 DefaultErrorHandler。这个 DefaultErrorHandler 配置了一个 DeadLetterPublishingRecoverer。我们还需要为消费者配置一个 ErrorHandlingDeserializer。听起来有很多复杂的东西,但实际上,在这个案例中,它归结为这 3 个 bean。

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
none
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
none
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
none

让我们逐一分析这些内容。首先是 ListenerContainerCustomizer bean,它接收一个 DefaultErrorHandler。现在,容器已经通过这个特定的错误处理程序进行了定制。你可以在这里了解更多关于容器定制的信息:容器定制

第二个 bean 是配置为发布到 DLTDefaultErrorHandler。有关 DefaultErrorHandler 的更多详细信息,请参见此处

第三个 bean 是 DeadLetterPublishingRecoverer,它最终负责将消息发送到 DLT。默认情况下,DLT 主题的名称是 ORIGINAL_TOPIC_NAME.DLT。不过你可以更改这个名称。更多详细信息请参阅 文档

我们还需要通过应用程序配置来配置一个 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委托给实际的反序列化器。在发生错误的情况下,它将记录的键/值设置为 null,并包含消息的原始字节。然后,它将异常设置在头信息中,并将此记录传递给监听器,监听器随后调用注册的错误处理程序。

以下是所需的配置:

spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
none

我们通过绑定的 configuration 属性提供了 ErrorHandlingDeserializer。我们还指定了实际委托的反序列化器是 StringDeserializer

请记住,上述的 dlq 属性与本方案中的讨论无关。它们仅用于处理应用程序级别的错误。

Kafka 绑定器中的基本偏移量管理

问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它是如何管理 Kafka 消费者偏移量的。你能解释一下吗?

解决方案

我们鼓励你阅读文档部分,以全面理解相关内容。

这里是它的要点:

Kafka 默认支持两种类型的偏移量来开始消费 - earliestlatest。它们的语义从名称上就可以自解释。

假设您是第一次运行消费者。如果您的 Spring Cloud Stream 应用程序中缺少 group.id,那么它将成为一个匿名消费者。每当您有一个匿名消费者时,Spring Cloud Stream 应用程序默认会从主题分区中可用的 latest 偏移量开始消费。另一方面,如果您明确指定了 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从主题分区中可用的 earliest 偏移量开始消费。

在上述两种情况下(具有显式组和匿名组的消费者),可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 并设置为 earliestlatest 来切换起始偏移量。

现在,假设你之前已经运行过消费者,现在再次启动它。在这种情况下,上述的起始偏移量语义不再适用,因为消费者会找到消费者组已经提交的偏移量(在匿名消费者的情况下,尽管应用程序没有提供 group.id,绑定器会自动为你生成一个)。消费者会从最后提交的偏移量继续消费。即使你提供了 startOffset 值,这一点仍然成立。

然而,你可以通过使用 resetOffsets 属性来覆盖消费者从最后提交的偏移量开始的默认行为。为此,将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认情况下为 false)。然后确保你提供了 startOffset 值(可以是 earliestlatest)。当你这样做并启动消费者应用程序时,每次启动时,它都会像第一次启动一样开始,并忽略该分区的任何已提交的偏移量。

在 Kafka 中寻找任意偏移量

问题陈述

使用 Kafka binder 时,我知道可以将偏移量设置为 earliestlatest,但我有一个需求,需要将偏移量定位到中间的某个位置,即一个任意的偏移量。有没有办法通过 Spring Cloud Stream Kafka binder 实现这一点?

解决方案

之前我们了解了 Kafka 绑定器如何帮助你处理基本的偏移量管理。默认情况下,绑定器不允许你回退到任意偏移量,至少不能通过我们在那个示例中看到的机制实现。然而,绑定器提供了一些低级别的策略来实现这个用例。让我们来探讨一下。

首先,当你想重置到一个非 earliestlatest 的任意偏移量时,请确保将 resetOffsets 配置保留为默认值,即 false。然后你需要提供一个类型为 KafkaBindingRebalanceListener 的自定义 bean,它将被注入到所有的消费者绑定中。这是一个带有一些默认方法的接口,但以下是我们感兴趣的方法:

/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
none

让我们来看看细节。

本质上,这个方法会在每次主题分区的初始分配或再平衡后被调用。为了更好地说明,让我们假设我们的主题是 foo,它有 4 个分区。最初,我们只在组中启动一个消费者,这个消费者将消费所有分区。当消费者首次启动时,所有 4 个分区都会被初始分配。然而,我们不希望分区从默认位置开始消费(因为我们定义了一个组,所以是 earliest),而是希望每个分区在寻找到任意偏移量后再开始消费。想象一下,你有一个业务需求需要从以下某些偏移量开始消费。

Partition   start offset

0 1000
1 2000
2 2000
3 1000
none

这可以通过如下实现上述方法来实现。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
none

这只是一个基础实现。实际使用场景要比这复杂得多,你需要相应地调整,但这无疑为你提供了一个基本的框架。当消费者 seek 失败时,可能会抛出一些运行时异常,你需要决定在这些情况下如何处理。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动第二个具有相同组 ID 的消费者会怎样?

当我们添加第二个消费者时,会发生重新平衡,一些分区会被重新分配。假设新的消费者获得了分区 23。当这个新的 Spring Cloud Stream 消费者调用 onPartitionsAssigned 方法时,它会发现这是该消费者上分区 23 的初始分配。因此,由于对 initial 参数的条件检查,它将执行 seek 操作。对于第一个消费者来说,它现在只拥有分区 01。然而,对于这个消费者来说,这只是一个重新平衡事件,并不被视为初始分配。因此,由于对 initial 参数的条件检查,它不会重新 seek 到给定的偏移量。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka binder 手动确认?

问题陈述

使用 Kafka binder 时,我想在消费者中手动确认消息。我该怎么做?

解决方案

默认情况下,Kafka binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。Spring Kafka 中的默认 ackModebatch。有关更多详细信息,请参见此处

在某些情况下,您可能希望禁用默认的提交行为,转而依赖手动提交。以下步骤可以帮助您实现这一点。

将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE。当这样设置时,消费者方法收到的消息中将包含一个名为 kafka_acknowledgment 的头部(来自 KafkaHeaders.ACKNOWLEDGMENT)。

例如,假设这是你的消费者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
none

然后你将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆盖默认的绑定名称?

问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但我如何覆盖这些绑定以使用更具领域友好性的名称?

解决方案

假设以下是你的函数签名。

@Bean
public Function<String, String> uppercase(){
...
}
none

默认情况下,Spring Cloud Stream 将创建如下所示的绑定。

  1. 大写输入-0

  2. 大写输出-0

你可以使用以下属性来覆盖这些绑定。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
none

在此之后,所有的绑定属性必须在新的名称 my-transformer-inmy-transformer-out 上进行设置。

这是另一个使用 Kafka Streams 和多个输入的例子。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
none

默认情况下,Spring Cloud Stream 会为这个函数创建三个不同的绑定名称。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想要在这些绑定上设置一些配置时,都必须使用这些绑定名称。您不喜欢这样,并且希望使用更符合领域习惯且易于阅读的绑定名称,例如,类似以下的内容。

  1. 订单

  2. 账户

  3. 增强订单

你可以通过简单地设置这三个属性来轻松实现这一点

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders
  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts
  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

一旦你这样做了,它就会覆盖默认的绑定名称,并且你想要设置的任何属性都必须基于这些新的绑定名称。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何将消息键作为记录的一部分发送?

问题陈述

我需要发送一个键以及记录的有效负载,Spring Cloud Stream 中有办法做到这一点吗?

解决方案

通常情况下,你可能希望将关联数据结构(如映射)作为带有键和值的记录发送。Spring Cloud Stream 允许你以直接的方式实现这一点。以下是一个基本的蓝图,但你可能需要根据你的特定用例进行调整。

这是一个示例的生产者方法(也称为 Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
none

这是一个简单的函数,它发送一个带有 String 负载的消息,同时也带有一个键。请注意,我们使用 KafkaHeaders.MESSAGE_KEY 将键设置为消息头。

如果你想将默认的 kafka_messageKey 更改为其他键,那么需要在配置中指定以下属性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
none

请注意,我们使用绑定名称 supplier-out-0,因为这是我们的函数名称,请相应地更新。

然后,我们在生成消息时使用这个新密钥。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生序列化器和反序列化器代替 Spring Cloud Stream 完成的消息转换?

问题陈述

我想使用 Kafka 原生的 SerializerDeserializer,而不是使用 Spring Cloud Stream 中的消息转换器。默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理这种转换。我该如何绕过这一点,并将责任委托给 Kafka 呢?

解决方案

这真的很容易做到。

你只需要提供以下属性即可启用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
none

然后,你还需要设置序列化器。有几种方法可以做到这一点。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
none

或使用 binder 配置。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
none

在使用 binder 方式时,它会应用于所有绑定,而在绑定处设置时则是针对每个绑定单独设置的。

在反序列化端,你只需要提供反序列化器作为配置。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
none

你也可以在绑定器级别进行设置。

有一个可选属性,您可以设置它以强制进行原生解码。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
none

然而,在 Kafka binder 的情况下,这是不必要的,因为当消息到达 binder 时,Kafka 已经使用配置的反序列化器对它们进行了反序列化。

解释 Kafka Streams 绑定器中的偏移重置是如何工作的

问题陈述

默认情况下,Kafka Streams 绑定器始终从新消费者的最早偏移量开始。有时,应用程序需要或希望从最新的偏移量开始。Kafka Streams 绑定器允许你这样做。

解决方案

在查看解决方案之前,让我们先看一下以下场景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
none

我们有一个 BiConsumer bean,它需要两个输入绑定。在这种情况下,第一个绑定用于 KStream,第二个绑定用于 KTable。当第一次运行此应用程序时,默认情况下,这两个绑定都从 earliest 偏移量开始。如果由于某些需求,我想从 latest 偏移量开始呢?你可以通过启用以下属性来实现这一点。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
none

如果你希望只有一个绑定从 latest 偏移量开始,而另一个消费者从默认的 earliest 偏移量开始消费,那么请在配置中省略后者的绑定。

请注意,一旦有已提交的偏移量(committed offsets),这些设置将不会被遵循,已提交的偏移量将优先。

跟踪 Kafka 中记录的成功发送(生产)

问题陈述

我有一个 Kafka 生产者应用程序,我想跟踪所有成功的发送。

解决方案

让我们假设在应用程序中有以下供应商。

@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
none

然后,我们需要定义一个新的 MessageChannel bean 来捕获所有成功发送的信息。

@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
none

接下来,在应用程序配置中定义此属性,以提供 recordMetadataChannel 的 bean 名称。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
none

此时,成功发送的信息将被发送到 fooRecordChannel

你可以编写如下的 IntegrationFlow 来查看信息。

@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
none

handle 方法中,payload 是被发送到 Kafka 的内容,而消息头包含一个特殊的键 kafka_recordMetadata。它的值是一个 RecordMetadata,其中包含有关主题分区、当前偏移量等信息。

在 Kafka 中添加自定义头映射器

问题陈述

我有一个 Kafka 生产者应用程序设置了一些头部信息,但在消费者应用程序中这些头部信息丢失了。为什么会这样?

解决方案

在正常情况下,这应该没问题。

想象一下,你有以下的生产者。

@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
none

在消费者端,您应该仍然能看到标题 "foo",并且以下内容不会给您带来任何问题。

@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
none

如果你在应用程序中提供了自定义的 header 映射器,那么这将无法正常工作。假设你在应用程序中有一个空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {

}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {

}
};
}
none

如果这是你的实现,那么消费者端将会丢失 foo 头信息。很可能你在那些 KafkaHeaderMapper 方法中有一些逻辑。你需要以下内容来填充 foo 头信息。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
none

这将正确地将 foo 头信息从生产者传递到消费者。

关于 id 标题的特殊说明

在 Spring Cloud Stream 中,id 头信息是一个特殊的头信息,但某些应用程序可能希望使用特殊的自定义 ID 头信息,例如 custom-idIDId。第一个(custom-id)将在没有任何自定义头信息映射器的情况下从生产者传播到消费者。然而,如果你使用框架保留的 id 头信息的变体(如 IDIdiD 等)进行生产,那么你将会遇到框架内部的问题。有关此用例的更多背景信息,请参阅这个 StackOverflow 讨论。在这种情况下,你必须使用自定义的 KafkaHeaderMapper 来映射区分大小写的 ID 头信息。例如,假设你有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
none

上面的 Id 头信息在消费端将会消失,因为它与框架的 id 头信息冲突。你可以提供一个自定义的 KafkaHeaderMapper 来解决这个问题。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
none

通过这样做,生产者和消费者双方都可以访问 idId 头信息。

在事务中向多个主题生产数据

问题陈述

如何向多个 Kafka 主题生成事务性消息?

更多上下文,请参阅这个 StackOverflow 问题

解决方案

在 Kafka binder 中使用事务支持进行事务处理,然后提供一个 AfterRollbackProcessor。为了向多个主题生成消息,使用 StreamBridge API。

以下是相关的代码片段:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
none

必填配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
none

为了测试,您可以使用以下内容:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
none

一些重要的注意事项:

请确保在应用程序配置中没有设置任何 DLQ(死信队列),因为我们会手动配置 DLT(默认情况下,它将发布到基于初始消费者函数命名的主题 input.DLT 上)。此外,将消费者绑定的 maxAttempts 重置为 1,以避免绑定器进行重试。在上面的示例中,最多将尝试 3 次(初始尝试 + FixedBackoff 中的 2 次尝试)。

有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将消费者绑定的 isolation-level 设置为 read-committed

这个 StackOverflow 线程 也与本次讨论相关。

运行多个可轮询消费者时要避免的陷阱

问题陈述

如何运行多个可轮询消费者的实例并为每个实例生成唯一的 client.id

解决方案

假设我有以下定义:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
none

在运行应用程序时,Kafka 消费者会生成一个 client.id(类似于 consumer-my-group-1)。对于每个正在运行的应用程序实例,这个 client.id 将是相同的,从而导致意外问题。

为了解决这个问题,你可以在应用程序的每个实例上添加以下属性:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
none

详情请参阅此 GitHub issue