配置选项
本节包含 Apache Kafka 绑定器所使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档中的绑定属性。
Kafka 绑定器属性
spring.cloud.stream.kafka.binder.brokers
Kafka 绑定器连接到的代理列表。
默认值:localhost
。
spring.cloud.stream.kafka.binder.defaultBrokerPort
是 Spring Cloud Stream Kafka Binder 的一个配置属性,用于设置 Kafka 代理的默认端口号。如果没有显式配置 spring.kafka.bootstrap-servers
,Kafka Binder 将使用此端口来连接到 Kafka 代理。
默认情况下,Kafka 代理的端口号是 9092
。你可以通过设置 spring.cloud.stream.kafka.binder.defaultBrokerPort
来覆盖这个默认值。
例如:
spring:
cloud:
stream:
kafka:
binder:
defaultBrokerPort: 9093
在这个示例中,Kafka Binder 将尝试连接到端口 9093
的 Kafka 代理。
brokers
允许指定带有或不带端口信息的主机(例如,host1,host2:port2
)。当在代理列表中没有配置端口时,这将设置默认端口。
默认值:9092
。
spring.cloud.stream.kafka.binder.configuration
传递给由绑定器创建的所有客户端的客户端属性(包括生产者和消费者)的键/值映射。由于这些属性同时被生产者和消费者使用,因此应将其使用限制在通用属性上——例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。此处设置的属性将覆盖在启动时设置的任何属性。
默认值:空映射。
spring.cloud.stream.kafka.binder.consumerProperties
任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此处还允许使用未知的消费者属性。此处设置的属性将覆盖在 boot
和上述 configuration
属性中设置的任何属性。
默认值:空映射。
spring.cloud.stream.kafka.binder.headers
由绑定器传输的自定义标头列表。仅在与使用 kafka-clients
版本 < 0.11.0.0 的旧版应用程序(⇐ 1.3.x)通信时才需要。较新版本原生支持标头。
默认值:空。
spring.cloud.stream.kafka.binder.healthTimeout
获取分区信息的等待时间,以秒为单位。如果此计时器过期,则健康报告为 down。
默认值:60。
spring.cloud.stream.kafka.binder.requiredAcks
代理上所需的确认数量。请参阅 Kafka 文档中的生产者 acks
属性。
默认值:1
。
spring.cloud.stream.kafka.binder.minPartitionCount
仅在设置了 autoCreateTopics
或 autoAddPartitions
时有效。绑定器在其生产或消费数据的主题上配置的全局最小分区数。它可以通过生产者的 partitionCount
设置或生产者的 instanceCount * concurrency
设置的值(如果其中任何一个更大)来覆盖。
默认值:1
。
spring.cloud.stream.kafka.binder.producerProperties
任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处还允许使用未知的生产者属性。此处设置的属性将覆盖在 boot
和上述 configuration
属性中设置的任何属性。
默认值:空映射。
spring.cloud.stream.kafka.binder.replicationFactor
如果 autoCreateTopics
处于激活状态,自动创建主题的副本因子。可以在每个绑定上进行覆盖。
如果您使用的是 Kafka broker 2.4 之前的版本,那么此值应至少设置为 1
。从 3.0.8 版本开始,绑定器使用 -1
作为默认值,这表示将使用 broker 的 default.replication.factor
属性来确定副本数量。请与您的 Kafka broker 管理员确认是否存在要求最小复制因子的策略,如果存在这种情况,通常情况下,default.replication.factor
将与该值匹配,并且应使用 -1
,除非您需要的复制因子大于最小值。
默认值:-1
。
spring.cloud.stream.kafka.binder.autoCreateTopics
如果设置为 true
,binder 会自动创建新的主题。如果设置为 false
,binder 会依赖已经配置好的主题。在后一种情况下,如果主题不存在,binder 将无法启动。
此设置与代理的 auto.create.topics.enable
设置无关,也不会影响它。如果服务器设置为自动创建主题,它们可能会作为元数据检索请求的一部分创建,并使用代理的默认设置。
默认值:true
。
spring.cloud.stream.kafka.binder.autoAddPartitions
如果设置为 true
,binder 会在需要时创建新的分区。如果设置为 false
,binder 会依赖主题已经配置好的分区大小。如果目标主题的分区数量小于预期值,binder 将无法启动。
默认值:false
。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.id
以及 spring-kafka
文档中的 事务。启用事务后,单个 producer
属性将被忽略,所有生产者都将使用 spring.cloud.stream.kafka.binder.transaction.producer.*
属性。
默认值为 null
(无事务)
spring.cloud.stream.kafka.binder.transaction.producer.*
事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生产者属性 以及所有绑定器支持的通用生产者属性。
默认值:请参阅各个生产者属性。
spring.cloud.stream.kafka.binder.headerMapperBeanName
用于将 spring-messaging
头信息映射到 Kafka 头信息以及从 Kafka 头信息映射回来的 KafkaHeaderMapper
的 bean 名称。例如,如果您希望对使用 JSON 反序列化头信息的 BinderHeaderMapper
bean 中的受信任包进行自定义,可以使用此属性。如果未使用此属性将此自定义的 BinderHeaderMapper
bean 提供给 binder,则 binder 将查找名称为 kafkaBinderHeaderMapper
且类型为 BinderHeaderMapper
的头信息映射器 bean,然后回退到由 binder 创建的默认 BinderHeaderMapper
。
默认值:无。
spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
当发现主题上的任何分区(无论接收数据的消费者是谁)没有领导者时,将绑定器健康状态设置为 down
的标志。
默认值:true
。
spring.cloud.stream.kafka.binder.certificateStoreDirectory
当 truststore 或 keystore 证书的位置被指定为非本地文件系统资源(由 org.springframework.core.io.Resource
支持的资源,例如 CLASSPATH、HTTP 等)时,绑定器会将该资源从路径(可转换为 org.springframework.core.io.Resource
)复制到文件系统上的某个位置。这适用于代理级别的证书(ssl.truststore.location
和 ssl.keystore.location
)以及用于 schema registry 的证书(schema.registry.ssl.truststore.location
和 schema.registry.ssl.keystore.location
)。请记住,truststore 和 keystore 的位置路径必须在 spring.cloud.stream.kafka.binder.configuration…
下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。文件将被复制到该属性值指定的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值且证书文件是非本地文件系统资源,则它将被复制到由 System.getProperty("java.io.tmpdir")
返回的系统临时目录中。如果此值存在,但在文件系统上找不到该目录或该目录不可写,也会发生这种情况。
默认值:无。
spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
当设置为 true
时,每次访问该指标时都会计算每个消费者主题的偏移滞后指标。当设置为 false
时,仅使用定期计算的偏移滞后。
默认值:true
spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
计算每个消费者主题的偏移量滞后的时间间隔。当 metrics.defaultOffsetLagMetricsEnabled
被禁用或其计算时间过长时,将使用此值。
默认值:60 秒
spring.cloud.stream.kafka.binder.enableObservation
在此绑定器中的所有绑定上启用 Micrometer 观测注册表。
默认值:false
spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
KafkaHealthIndicator
元数据消费者 group.id
。该消费者被 HealthIndicator
用来查询正在使用的主题的元数据。
默认值:无。
Kafka 消费者属性
以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value>
。
admin.configuration
自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.properties
,未来版本中将不再支持该属性。
admin.replicas-assignment
自 2.1.1 版本起,该属性已被弃用,建议使用 topic.replicas-assignment
,并且在未来版本中将不再支持该属性。
admin.replication-factor
自 2.1.1 版本起,该属性已被弃用,推荐使用 topic.replication-factor
,未来版本中将不再支持此属性。
autoRebalanceEnabled
当设置为 true
时,主题分区会在消费者组的成员之间自动重新平衡。当设置为 false
时,每个消费者会根据 spring.cloud.stream.instanceCount
和 spring.cloud.stream.instanceIndex
分配一组固定的分区。这要求在每个启动的实例上正确设置 spring.cloud.stream.instanceCount
和 spring.cloud.stream.instanceIndex
属性。在这种情况下,spring.cloud.stream.instanceCount
属性的值通常必须大于 1。
默认值:true
。
ackEachRecord
当 autoCommitOffset
为 true
时,此设置决定是否在每条记录处理完后提交偏移量。默认情况下,偏移量会在 consumer.poll()
返回的记录批次中的所有记录处理完毕后提交。通过 poll
返回的记录数量可以通过 Kafka 的 max.poll.records
属性进行控制,该属性通过消费者的 configuration
属性进行设置。将此设置为 true
可能会导致性能下降,但这样做可以减少在发生故障时重新传递记录的可能性。另请参阅绑定器的 requiredAcks
属性,该属性也会影响提交偏移量的性能。从 3.1 版本开始,此属性已被弃用,推荐使用 ackMode
。如果未设置 ackMode
且未启用批处理模式,则将使用 RECORD
ackMode。
默认值:false
。
autoCommitOffset
从 3.1 版本开始,此属性已弃用。有关替代方案的更多详细信息,请参阅 ackMode
。当消息被处理时是否自动提交偏移量。如果设置为 false
,则在入站消息中会存在一个键为 kafka_acknowledgment
、类型为 org.springframework.kafka.support.Acknowledgment
的头部。应用程序可以使用此头部来确认消息。有关详细信息,请参阅示例部分。当此属性设置为 false
时,Kafka binder 将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,应用程序负责确认记录。另请参阅 ackEachRecord
。
默认值:true
。
确认模式
指定容器的确认模式。这是基于 Spring Kafka 中定义的 AckMode
枚举。如果 ackEachRecord
属性设置为 true
并且消费者不在批处理模式下,则将使用 RECORD
的确认模式,否则将使用此属性提供的确认模式。
autoCommitOnError
在可轮询的消费者中,如果设置为 true
,在发生错误时始终会自动提交。如果未设置(默认)或设置为 false
,可轮询的消费者将不会自动提交。请注意,此属性仅适用于可轮询的消费者。
默认值:未设置。
重置偏移量
是否将消费者的偏移量重置为 startOffset
提供的值。如果提供了 KafkaBindingRebalanceListener
,则必须为 false;请参阅 rebalance listener。有关此属性的更多信息,请参阅 reset-offsets。
默认值:false
。
startOffset
新组的起始偏移量。允许的值为:earliest
和 latest
。如果为消费者的 'binding' 显式设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group
),则 'startOffset' 设置为 earliest
。否则,对于 anonymous
消费者组,它设置为 latest
。有关此属性的更多信息,请参见 reset-offsets。
默认值:null(等同于 earliest
)。
enableDlq
当设置为 true
时,它将为消费者启用 DLQ(死信队列)行为。默认情况下,导致错误的消息会被转发到一个名为 error.<destination>.<group>
的主题。DLQ 主题名称可以通过设置 dlqName
属性或定义一个类型为 DlqDestinationResolver
的 @Bean
来配置。这为 Kafka 重放场景提供了一个替代方案,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。有关更多信息,请参阅 kafka dlq 处理。从 2.0 版本开始,发送到 DLQ 主题的消息会增强以下头部信息:x-original-topic
、x-exception-message
和 x-exception-stacktrace
,并以 byte[]
的形式存储。默认情况下,失败记录会被发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为的信息,请参阅 dlq 分区选择。当 destinationIsPattern
为 true
时,不允许使用此功能。
默认值:false
。
dlqPartitions
当 enableDlq
为 true
且未设置此属性时,将创建一个与主主题分区数量相同的死信主题。通常,死信记录会被发送到死信主题中与原始记录相同的分区。此行为可以更改;请参阅 dlq 分区选择。如果此属性设置为 1
并且没有 DqlPartitionFunction
bean,则所有死信记录都将写入分区 0
。如果此属性大于 1
,则必须提供一个 DlqPartitionFunction
bean。请注意,实际的分区数量会受到绑定器的 minPartitionCount
属性的影响。
默认值:none
配置
一个包含通用 Kafka 消费者属性的键/值对的映射。除了 Kafka 消费者属性外,还可以在此传递其他配置属性。例如,应用程序所需的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。bootstrap.servers
属性不能在此设置;如果需要连接到多个集群,请使用多绑定器支持。
默认值:空映射。
dlqName
接收错误消息的 DLQ 主题名称。
默认值:null(如果未指定,导致错误的消息将被转发到名为 error.<destination>.<group>
的主题)。
dlqProducerProperties
通过这种方式,可以设置特定于 DLQ(死信队列)的生产者属性。所有通过 Kafka 生产者属性可用的属性都可以通过此属性进行设置。当在消费者端启用了原生解码(即 useNativeDecoding: true
)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializer
和 dlqProducerProperties.configuration.value.serializer
的形式提供。
默认值:默认的 Kafka 生产者属性。
标准头文件
指示入站通道适配器填充哪些标准头信息。允许的值为:none
、id
、timestamp
或 both
。如果使用原生反序列化,并且第一个接收消息的组件需要 id
(例如配置为使用 JDBC 消息存储的聚合器),则此选项非常有用。
默认值:none
converterBeanName
实现 RecordMessageConverter
的 bean 名称。用于入站通道适配器中以替换默认的 MessagingMessageConverter
。
默认值:null
idleEventInterval
是一个用于定义空闲事件间隔时间的参数。在计算机领域中,它通常用于指定在系统或应用程序处于空闲状态时,触发事件的时间间隔。这个参数可以帮助优化资源的使用,避免在系统空闲时频繁触发不必要的事件处理。
在未接收到消息的事件之间的间隔时间,单位为毫秒。可以使用 ApplicationListener<ListenerContainerIdleEvent>
来接收这些事件。有关使用示例,请参阅 pause-resume。
默认值:30000
destinationIsPattern
当为 true
时,目标被视为一个正则表达式 Pattern
,用于通过代理匹配主题名称。当为 true
时,不会预配主题,并且不允许使用 enableDlq
,因为在预配阶段绑定器不知道主题名称。注意,检测与模式匹配的新主题所需的时间由消费者属性 metadata.max.age.ms
控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上面的 configuration
属性进行配置。
默认值:false
topic.properties
用于在配置新主题时使用的 Kafka 主题属性的 Map
— 例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
topic.replicas-assignment
一个 Map<Integer, List<Integer>>
类型的副本分配映射,其中键是分区,值是副本分配列表。在创建新主题时使用。请参阅 kafka-clients
jar 中的 NewTopic
Javadocs。
默认值:无。
topic.replication-factor
创建主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments
,则忽略此设置。
默认值:无(使用 binder 范围的默认值 -1)。
pollTimeout
用于可轮询消费者的轮询超时时间。
默认值:5 秒。
事务管理器
用于覆盖此绑定的绑定器事务管理器的 KafkaAwareTransactionManager
的 Bean 名称。通常,如果您希望使用 ChainedKafkaTransactionManager
将另一个事务与 Kafka 事务同步,则需要此配置。为了实现记录的精确一次消费和生产,消费者和生产者绑定必须全部配置为使用相同的事务管理器。
默认值:无。
txCommitRecovered
在使用事务性绑定器时,默认情况下,恢复的记录(例如,当重试次数耗尽且记录被发送到死信主题时)的偏移量将通过新事务提交。将此属性设置为 false
可以抑制提交恢复记录的偏移量。
默认值:true。
commonErrorHandlerBeanName
每个消费者绑定使用的 CommonErrorHandler
bean 名称。如果存在,此用户提供的 CommonErrorHandler
将优先于绑定器定义的任何其他错误处理程序。如果应用程序不希望使用 ListenerContainerCustomizer
然后检查目标/组组合来设置错误处理程序,这是一种方便的表达错误处理程序的方式。
默认值:无。
Kafka 生产者属性
以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>
。
admin.configuration
自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.properties
,并且将在未来的版本中移除对该属性的支持。
admin.replicas-assignment
自 2.1.1 版本起,此属性已被弃用,推荐使用 topic.replicas-assignment
,并且将在未来的版本中移除对该属性的支持。
admin.replication-factor
自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replication-factor
,并在未来版本中移除对其的支持。
bufferSize
Kafka 生产者在发送前尝试批量处理的数据上限(以字节为单位)。
默认值:16384
。
同步
生产者是否是同步的。
默认值:false
。
sendTimeoutExpression
一个针对出站消息进行求值的 SpEL 表达式,用于在启用同步发布时评估等待确认的时间 —— 例如 headers['mySendTimeout']
。超时时间的值以毫秒为单位。在 3.0 版本之前,除非使用原生编码,否则无法使用有效负载,因为在此表达式求值时,有效负载已经转换为 byte[]
的形式。现在,表达式在有效负载转换之前就会被求值。
默认值:none
。
batchTimeout
生产者在发送消息之前等待多长时间以允许更多消息累积到同一批次中。(通常情况下,生产者根本不会等待,而是直接发送在前一次发送过程中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。
默认值:0
。
messageKeyExpression
一个针对出站消息进行求值的 SpEL 表达式,用于填充生成的 Kafka 消息的键 —— 例如 headers['myKey']
。在 3.0 之前的版本中,除非使用了原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以 byte[]
的形式存在。现在,表达式会在有效负载转换之前进行评估。对于常规处理器(Function<String, String>
或 Function<Message<?>, Message<?>>
),如果生成的键需要与来自主题的传入键相同,则可以按如下方式设置此属性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
。对于响应式函数,有一个重要的注意事项需要牢记。在这种情况下,应用程序需要手动将传入消息的标头复制到出站消息中。你可以设置标头,例如 myKey
并使用 headers['myKey']
,如上所述,或者为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY
标头,你根本不需要设置此属性。
默认值:none
。
headerPatterns
这是一个以逗号分隔的简单模式列表,用于匹配 Spring 消息头,这些消息头将被映射到 ProducerRecord
中的 Kafka Headers
。模式可以以通配符(星号)开头或结尾。通过在模式前添加 !
可以否定该模式。匹配在第一次匹配(无论是正向还是负向)后停止。例如,!ask,as*
将允许 ash
通过,但不允许 ask
通过。id
和 timestamp
永远不会被映射。
默认值:*
(所有标题 - 除了 id
和 timestamp
)
配置
包含通用 Kafka 生产者属性的键/值对的映射。bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。
默认值:空映射。
topic.properties
一个 Map
,用于在配置新 Kafka 主题时的属性设置 — 例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
topic.replicas-assignment
这是一个 Map<Integer, List<Integer>>
类型的副本分配映射,其中键表示分区,值表示副本分配。在配置新主题时使用。详情请参阅 kafka-clients
jar 包中的 NewTopic
Javadocs。
默认值:无。
topic.replication-factor
创建主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments
,则忽略此设置。
默认值:无(使用绑定范围内的默认值 -1)。
useTopicHeader
设置为 true
以使用出站消息中的 KafkaHeaders.TOPIC
消息头的值覆盖默认的绑定目标(主题名称)。如果该消息头不存在,则使用默认的绑定目标。
默认值:false
。
recordMetadataChannel
MessageChannel
的 bean 名称,成功发送的结果应发送到该通道;该 bean 必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(如果有转换,则是转换后的消息),并带有一个额外的头信息 KafkaHeaders.RECORD_METADATA
。该头信息包含由 Kafka 客户端提供的 RecordMetadata
对象;它包括记录写入主题的分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
发送失败的消息会进入生产者的错误通道(如果已配置);详情请参阅 Kafka 错误通道。
默认值:null。
Kafka 绑定器使用生产者的 partitionCount
设置作为提示来创建一个具有指定分区数量的主题(与 minPartitionCount
结合使用,两者中的较大值将被使用)。在同时为绑定器配置 minPartitionCount
和为应用程序配置 partitionCount
时,请谨慎操作,因为将使用较大的值。如果主题已经存在且分区数量较少,并且 autoAddPartitions
被禁用(默认情况下),绑定器将无法启动。如果主题已经存在且分区数量较少,并且 autoAddPartitions
被启用,则会添加新的分区。如果主题已经存在的分区数量大于 minPartitionCount
或 partitionCount
的最大值,则将使用现有的分区数量。
压缩
设置 compression.type
生产者属性。支持的值为 none
、gzip
、snappy
、lz4
和 zstd
。如果你按照 Spring for Apache Kafka 文档 中的讨论将 kafka-clients
jar 覆盖为 2.1.0(或更高版本),并且希望使用 zstd
压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。
默认值:none
。
transactionManager
用于覆盖此绑定的绑定器事务管理器的 KafkaAwareTransactionManager
的 Bean 名称。通常,如果您希望使用 ChainedKafkaTransactionManager
将另一个事务与 Kafka 事务同步,则需要此配置。为了实现记录的恰好一次消费和生产,消费者和生产者绑定都必须配置为使用相同的事务管理器。
默认值:无。
closeTimeout
等待关闭生产者时的超时时间(以秒为单位)。
默认值:30
allowNonTransactional
通常情况下,与事务性绑定器关联的所有输出绑定都会在一个新的事务中发布,如果当前没有正在进行的事务。此属性允许您覆盖该行为。如果设置为 true
,则发布到此输出绑定的记录将不会在事务中运行,除非已经有一个事务正在进行中。
默认值:false