跳到主要内容

配置选项

DeepSeek V3 中英对照 Configuration Options

本节包含 Apache Kafka 绑定器所使用的配置选项。

有关绑定器的常见配置选项和属性,请参阅核心文档中的绑定属性

Kafka 绑定器属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器连接到的代理列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort 是 Spring Cloud Stream Kafka Binder 的一个配置属性,用于设置 Kafka 代理的默认端口号。如果没有显式配置 spring.kafka.bootstrap-servers,Kafka Binder 将使用此端口来连接到 Kafka 代理。

默认情况下,Kafka 代理的端口号是 9092。你可以通过设置 spring.cloud.stream.kafka.binder.defaultBrokerPort 来覆盖这个默认值。

例如:

spring:
cloud:
stream:
kafka:
binder:
defaultBrokerPort: 9093
yaml

在这个示例中,Kafka Binder 将尝试连接到端口 9093 的 Kafka 代理。

brokers 允许指定带有或不带端口信息的主机(例如,host1,host2:port2)。当在代理列表中没有配置端口时,这将设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

传递给由绑定器创建的所有客户端的客户端属性(包括生产者和消费者)的键/值映射。由于这些属性同时被生产者和消费者使用,因此应将其使用限制在通用属性上——例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。此处设置的属性将覆盖在启动时设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此处还允许使用未知的消费者属性。此处设置的属性将覆盖在 boot 和上述 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

由绑定器传输的自定义标头列表。仅在与使用 kafka-clients 版本 < 0.11.0.0 的旧版应用程序(⇐ 1.3.x)通信时才需要。较新版本原生支持标头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

获取分区信息的等待时间,以秒为单位。如果此计时器过期,则健康报告为 down。

默认值:60。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的确认数量。请参阅 Kafka 文档中的生产者 acks 属性。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅在设置了 autoCreateTopicsautoAddPartitions 时有效。绑定器在其生产或消费数据的主题上配置的全局最小分区数。它可以通过生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果其中任何一个更大)来覆盖。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处还允许使用未知的生产者属性。此处设置的属性将覆盖在 boot 和上述 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于激活状态,自动创建主题的副本因子。可以在每个绑定上进行覆盖。

备注

如果您使用的是 Kafka broker 2.4 之前的版本,那么此值应至少设置为 1。从 3.0.8 版本开始,绑定器使用 -1 作为默认值,这表示将使用 broker 的 default.replication.factor 属性来确定副本数量。请与您的 Kafka broker 管理员确认是否存在要求最小复制因子的策略,如果存在这种情况,通常情况下,default.replication.factor 将与该值匹配,并且应使用 -1,除非您需要的复制因子大于最小值。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,binder 会自动创建新的主题。如果设置为 false,binder 会依赖已经配置好的主题。在后一种情况下,如果主题不存在,binder 将无法启动。

备注

此设置与代理的 auto.create.topics.enable 设置无关,也不会影响它。如果服务器设置为自动创建主题,它们可能会作为元数据检索请求的一部分创建,并使用代理的默认设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,binder 会在需要时创建新的分区。如果设置为 false,binder 会依赖主题已经配置好的分区大小。如果目标主题的分区数量小于预期值,binder 将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.id 以及 spring-kafka 文档中的 事务。启用事务后,单个 producer 属性将被忽略,所有生产者都将使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值为 null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性 以及所有绑定器支持的通用生产者属性。

默认值:请参阅各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging 头信息映射到 Kafka 头信息以及从 Kafka 头信息映射回来的 KafkaHeaderMapper 的 bean 名称。例如,如果您希望对使用 JSON 反序列化头信息的 BinderHeaderMapper bean 中的受信任包进行自定义,可以使用此属性。如果未使用此属性将此自定义的 BinderHeaderMapper bean 提供给 binder,则 binder 将查找名称为 kafkaBinderHeaderMapper 且类型为 BinderHeaderMapper 的头信息映射器 bean,然后回退到由 binder 创建的默认 BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

当发现主题上的任何分区(无论接收数据的消费者是谁)没有领导者时,将绑定器健康状态设置为 down 的标志。

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当 truststore 或 keystore 证书的位置被指定为非本地文件系统资源(由 org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)时,绑定器会将该资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。这适用于代理级别的证书(ssl.truststore.locationssl.keystore.location)以及用于 schema registry 的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)。请记住,truststore 和 keystore 的位置路径必须在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。文件将被复制到该属性值指定的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值且证书文件是非本地文件系统资源,则它将被复制到由 System.getProperty("java.io.tmpdir") 返回的系统临时目录中。如果此值存在,但在文件系统上找不到该目录或该目录不可写,也会发生这种情况。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

当设置为 true 时,每次访问该指标时都会计算每个消费者主题的偏移滞后指标。当设置为 false 时,仅使用定期计算的偏移滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个消费者主题的偏移量滞后的时间间隔。当 metrics.defaultOffsetLagMetricsEnabled 被禁用或其计算时间过长时,将使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此绑定器中的所有绑定上启用 Micrometer 观测注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据消费者 group.id。该消费者被 HealthIndicator 用来查询正在使用的主题的元数据。

默认值:无。

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

备注

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value>

admin.configuration

自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.properties,未来版本中将不再支持该属性。

admin.replicas-assignment

自 2.1.1 版本起,该属性已被弃用,建议使用 topic.replicas-assignment,并且在未来版本中将不再支持该属性。

admin.replication-factor

自 2.1.1 版本起,该属性已被弃用,推荐使用 topic.replication-factor,未来版本中将不再支持此属性。

autoRebalanceEnabled

当设置为 true 时,主题分区会在消费者组的成员之间自动重新平衡。当设置为 false 时,每个消费者会根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定的分区。这要求在每个启动的实例上正确设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在每条记录处理完后提交偏移量。默认情况下,偏移量会在 consumer.poll() 返回的记录批次中的所有记录处理完毕后提交。通过 poll 返回的记录数量可以通过 Kafka 的 max.poll.records 属性进行控制,该属性通过消费者的 configuration 属性进行设置。将此设置为 true 可能会导致性能下降,但这样做可以减少在发生故障时重新传递记录的可能性。另请参阅绑定器的 requiredAcks 属性,该属性也会影响提交偏移量的性能。从 3.1 版本开始,此属性已被弃用,推荐使用 ackMode。如果未设置 ackMode 且未启用批处理模式,则将使用 RECORD ackMode。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。有关替代方案的更多详细信息,请参阅 ackMode。当消息被处理时是否自动提交偏移量。如果设置为 false,则在入站消息中会存在一个键为 kafka_acknowledgment、类型为 org.springframework.kafka.support.Acknowledgment 的头部。应用程序可以使用此头部来确认消息。有关详细信息,请参阅示例部分。当此属性设置为 false 时,Kafka binder 将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,应用程序负责确认记录。另请参阅 ackEachRecord

默认值:true

确认模式

指定容器的确认模式。这是基于 Spring Kafka 中定义的 AckMode 枚举。如果 ackEachRecord 属性设置为 true 并且消费者不在批处理模式下,则将使用 RECORD 的确认模式,否则将使用此属性提供的确认模式。

autoCommitOnError

在可轮询的消费者中,如果设置为 true,在发生错误时始终会自动提交。如果未设置(默认)或设置为 false,可轮询的消费者将不会自动提交。请注意,此属性仅适用于可轮询的消费者。

默认值:未设置。

重置偏移量

是否将消费者的偏移量重置为 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,则必须为 false;请参阅 rebalance listener。有关此属性的更多信息,请参阅 reset-offsets

默认值:false

startOffset

新组的起始偏移量。允许的值为:earliestlatest。如果为消费者的 'binding' 显式设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则 'startOffset' 设置为 earliest。否则,对于 anonymous 消费者组,它设置为 latest。有关此属性的更多信息,请参见 reset-offsets

默认值:null(等同于 earliest)。

enableDlq

当设置为 true 时,它将为消费者启用 DLQ(死信队列)行为。默认情况下,导致错误的消息会被转发到一个名为 error.<destination>.<group> 的主题。DLQ 主题名称可以通过设置 dlqName 属性或定义一个类型为 DlqDestinationResolver@Bean 来配置。这为 Kafka 重放场景提供了一个替代方案,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。有关更多信息,请参阅 kafka dlq 处理。从 2.0 版本开始,发送到 DLQ 主题的消息会增强以下头部信息:x-original-topicx-exception-messagex-exception-stacktrace,并以 byte[] 的形式存储。默认情况下,失败记录会被发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为的信息,请参阅 dlq 分区选择destinationIsPatterntrue 时,不允许使用此功能。

默认值:false

dlqPartitions

enableDlqtrue 且未设置此属性时,将创建一个与主主题分区数量相同的死信主题。通常,死信记录会被发送到死信主题中与原始记录相同的分区。此行为可以更改;请参阅 dlq 分区选择。如果此属性设置为 1 并且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。如果此属性大于 1,则必须提供一个 DlqPartitionFunction bean。请注意,实际的分区数量会受到绑定器的 minPartitionCount 属性的影响。

默认值:none

配置

一个包含通用 Kafka 消费者属性的键/值对的映射。除了 Kafka 消费者属性外,还可以在此传递其他配置属性。例如,应用程序所需的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=barbootstrap.servers 属性不能在此设置;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

dlqName

接收错误消息的 DLQ 主题名称。

默认值:null(如果未指定,导致错误的消息将被转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

通过这种方式,可以设置特定于 DLQ(死信队列)的生产者属性。所有通过 Kafka 生产者属性可用的属性都可以通过此属性进行设置。当在消费者端启用了原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认的 Kafka 生产者属性。

标准头文件

指示入站通道适配器填充哪些标准头信息。允许的值为:noneidtimestampboth。如果使用原生反序列化,并且第一个接收消息的组件需要 id(例如配置为使用 JDBC 消息存储的聚合器),则此选项非常有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 bean 名称。用于入站通道适配器中以替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval 是一个用于定义空闲事件间隔时间的参数。在计算机领域中,它通常用于指定在系统或应用程序处于空闲状态时,触发事件的时间间隔。这个参数可以帮助优化资源的使用,避免在系统空闲时频繁触发不必要的事件处理。

在未接收到消息的事件之间的间隔时间,单位为毫秒。可以使用 ApplicationListener<ListenerContainerIdleEvent> 来接收这些事件。有关使用示例,请参阅 pause-resume

默认值:30000

destinationIsPattern

当为 true 时,目标被视为一个正则表达式 Pattern,用于通过代理匹配主题名称。当为 true 时,不会预配主题,并且不允许使用 enableDlq,因为在预配阶段绑定器不知道主题名称。注意,检测与模式匹配的新主题所需的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上面的 configuration 属性进行配置。

默认值:false

topic.properties

用于在配置新主题时使用的 Kafka 主题属性的 Map — 例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

一个 Map<Integer, List<Integer>> 类型的副本分配映射,其中键是分区,值是副本分配列表。在创建新主题时使用。请参阅 kafka-clients jar 中的 NewTopic Javadocs。

默认值:无。

topic.replication-factor

创建主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments,则忽略此设置。

默认值:无(使用 binder 范围的默认值 -1)。

pollTimeout

用于可轮询消费者的轮询超时时间。

默认值:5 秒。

事务管理器

用于覆盖此绑定的绑定器事务管理器的 KafkaAwareTransactionManager 的 Bean 名称。通常,如果您希望使用 ChainedKafkaTransactionManager 将另一个事务与 Kafka 事务同步,则需要此配置。为了实现记录的精确一次消费和生产,消费者和生产者绑定必须全部配置为使用相同的事务管理器。

默认值:无。

txCommitRecovered

在使用事务性绑定器时,默认情况下,恢复的记录(例如,当重试次数耗尽且记录被发送到死信主题时)的偏移量将通过新事务提交。将此属性设置为 false 可以抑制提交恢复记录的偏移量。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定使用的 CommonErrorHandler bean 名称。如果存在,此用户提供的 CommonErrorHandler 将优先于绑定器定义的任何其他错误处理程序。如果应用程序不希望使用 ListenerContainerCustomizer 然后检查目标/组组合来设置错误处理程序,这是一种方便的表达错误处理程序的方式。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

备注

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>

admin.configuration

自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.properties,并且将在未来的版本中移除对该属性的支持。

admin.replicas-assignment

自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.replicas-assignment,并且将在未来的版本中移除对该属性的支持。

admin.replication-factor

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replication-factor,并在未来版本中移除对其的支持。

bufferSize

Kafka 生产者在发送前尝试批量处理的数据上限(以字节为单位)。

默认值:16384

同步

生产者是否是同步的。

默认值:false

sendTimeoutExpression

一个针对出站消息进行求值的 SpEL 表达式,用于在启用同步发布时评估等待确认的时间 —— 例如 headers['mySendTimeout']。超时时间的值以毫秒为单位。在 3.0 版本之前,除非使用原生编码,否则无法使用有效负载,因为在此表达式求值时,有效负载已经转换为 byte[] 的形式。现在,表达式在有效负载转换之前就会被求值。

默认值:none

batchTimeout

生产者在发送消息之前等待多长时间以允许更多消息累积到同一批次中。(通常情况下,生产者根本不会等待,而是直接发送在前一次发送过程中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。

默认值:0

messageKeyExpression

一个针对出站消息进行求值的 SpEL 表达式,用于填充生成的 Kafka 消息的键 —— 例如 headers['myKey']。在 3.0 之前的版本中,除非使用了原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以 byte[] 的形式存在。现在,表达式会在有效负载转换之前进行评估。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>>),如果生成的键需要与来自主题的传入键相同,则可以按如下方式设置此属性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']。对于响应式函数,有一个重要的注意事项需要牢记。在这种情况下,应用程序需要手动将传入消息的标头复制到出站消息中。你可以设置标头,例如 myKey 并使用 headers['myKey'],如上所述,或者为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY 标头,你根本不需要设置此属性。

默认值:none

headerPatterns

这是一个以逗号分隔的简单模式列表,用于匹配 Spring 消息头,这些消息头将被映射到 ProducerRecord 中的 Kafka Headers。模式可以以通配符(星号)开头或结尾。通过在模式前添加 ! 可以否定该模式。匹配在第一次匹配(无论是正向还是负向)后停止。例如,!ask,as* 将允许 ash 通过,但不允许 ask 通过。idtimestamp 永远不会被映射。

默认值:*(所有标题 - 除了 idtimestamp

配置

包含通用 Kafka 生产者属性的键/值对的映射。bootstrap.servers 属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

topic.properties

一个 Map,用于在配置新 Kafka 主题时的属性设置 — 例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

这是一个 Map<Integer, List<Integer>> 类型的副本分配映射,其中键表示分区,值表示副本分配。在配置新主题时使用。详情请参阅 kafka-clients jar 包中的 NewTopic Javadocs。

默认值:无。

topic.replication-factor

创建主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments,则忽略此设置。

默认值:无(使用绑定范围内的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中的 KafkaHeaders.TOPIC 消息头的值覆盖默认的绑定目标(主题名称)。如果该消息头不存在,则使用默认的绑定目标。

默认值:false

recordMetadataChannel

MessageChannel 的 bean 名称,成功发送的结果应发送到该通道;该 bean 必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(如果有转换,则是转换后的消息),并带有一个额外的头信息 KafkaHeaders.RECORD_METADATA。该头信息包含由 Kafka 客户端提供的 RecordMetadata 对象;它包括记录写入主题的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

发送失败的消息会进入生产者的错误通道(如果已配置);详情请参阅 Kafka 错误通道

默认值:null。

备注

Kafka 绑定器使用生产者的 partitionCount 设置作为提示来创建一个具有指定分区数量的主题(与 minPartitionCount 结合使用,两者中的较大值将被使用)。在同时为绑定器配置 minPartitionCount 和为应用程序配置 partitionCount 时,请谨慎操作,因为将使用较大的值。如果主题已经存在且分区数量较少,并且 autoAddPartitions 被禁用(默认情况下),绑定器将无法启动。如果主题已经存在且分区数量较少,并且 autoAddPartitions 被启用,则会添加新的分区。如果主题已经存在的分区数量大于 minPartitionCountpartitionCount 的最大值,则将使用现有的分区数量。

压缩

设置 compression.type 生产者属性。支持的值为 nonegzipsnappylz4zstd。如果你按照 Spring for Apache Kafka 文档 中的讨论将 kafka-clients jar 覆盖为 2.1.0(或更高版本),并且希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

transactionManager

用于覆盖此绑定的绑定器事务管理器的 KafkaAwareTransactionManager 的 Bean 名称。通常,如果您希望使用 ChainedKafkaTransactionManager 将另一个事务与 Kafka 事务同步,则需要此配置。为了实现记录的恰好一次消费和生产,消费者和生产者绑定都必须配置为使用相同的事务管理器。

默认值:无。

closeTimeout

等待关闭生产者时的超时时间(以秒为单位)。

默认值:30

allowNonTransactional

通常情况下,与事务性绑定器关联的所有输出绑定都会在一个新的事务中发布,如果当前没有正在进行的事务。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非已经有一个事务正在进行中。

默认值:false