配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
Kafka Streams 绑定器属性
以下属性在 binder 级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。在 Kafka Streams binder 中重用的任何 Kafka binder 提供的属性必须使用 spring.cloud.stream.kafka.streams.binder
作为前缀,而不是 spring.cloud.stream.kafka.binder
。此规则的唯一例外是在定义 Kafka 引导服务器属性时,此时可以使用任一前缀。
配置
这是一个包含与 Apache Kafka Streams API 相关属性的键值对的映射。此属性必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能用于流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。可以通过此属性设置可以从 StreamsConfig
设置的所有配置。使用此属性时,由于这是绑定器级别的属性,因此适用于整个应用程序。如果应用程序中有多个处理器,所有这些处理器都将获取这些属性。对于像 application.id
这样的属性,这将成为一个问题,因此您必须仔细检查如何使用此绑定器级别的 configuration
属性映射 StreamsConfig
中的属性。
functions.<function-bean-name>.applicationId
仅适用于函数式风格的处理器。这可以用于在应用程序中为每个函数设置应用程序 ID。在多个函数的情况下,这是一种便捷的设置应用程序 ID 的方式。
functions.<function-bean-name>.configuration
函数.<函数-bean-名称>.配置
仅适用于函数式风格的处理器。这是一个包含与 Apache Kafka Streams API 相关属性的键/值对的映射。这与上面描述的绑定器级别的 configuration
属性类似,但此级别的 configuration
属性仅针对指定的函数进行限制。当你有多个处理器并且希望根据特定函数限制配置访问时,可能需要使用此属性。所有 StreamsConfig
属性都可以在此处使用。
brokers
Broker URL
默认值:localhost
zkNodes
Zookeeper URL
默认值:localhost
反序列化异常处理器
反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此适用于应用程序中的所有输入绑定。有一种方法可以在消费者绑定级别以更细粒度的方式控制它。可能的值为 - logAndContinue
、logAndFail
、skipAndContinue
或 sendToDlq
默认值:logAndFail
applicationId
方便地在绑定器级别全局设置 Kafka Streams 应用程序的 application.id
。如果应用程序包含多个函数,则应该设置不同的应用程序 ID。请参阅上文,详细讨论了如何设置应用程序 ID。
默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
stateStoreRetry.maxAttempts
是用于配置状态存储重试的最大尝试次数的参数。
尝试连接到状态存储的最大次数。
默认值:1
stateStoreRetry.backoffPeriod
重试连接到状态存储时的退避时间。
默认值:1000 毫秒
consumerProperties
在绑定器级别上的任意消费者属性。
producerProperties
在绑定器级别上的任意生产者属性。
includeStoppedProcessorsForHealthCheck
当通过执行器停止处理器的绑定时,默认情况下,该处理器将不会参与健康检查。将此属性设置为 true
可为所有处理器启用健康检查,包括当前通过绑定执行器端点停止的处理器。
默认值:false
Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为前缀。为了方便起见,如果有多个输出绑定并且它们都需要一个共同的值,可以通过使用前缀 spring.cloud.stream.kafka.streams.default.producer.
来配置。
keySerde
使用的 key serde
默认值:请参阅上面关于消息序列化/反序列化的讨论
valueSerde
值序列化/反序列化方式
默认值:请参阅上述关于消息序列化/反序列化的讨论
useNativeEncoding
启用/禁用原生编码的标志
默认值:true
。
streamPartitionerBeanName
是用于指定流分区器 Bean 名称的属性。在流处理应用程序中,分区器负责将数据流分配到不同的分区中,以便并行处理。通过设置 streamPartitionerBeanName
,您可以指定一个自定义的分区器 Bean 来替代默认的分区策略。
消费者端使用的自定义出站分区器 Bean 名称。应用程序可以提供自定义的 StreamPartitioner
作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者,以替代默认的分区器使用。
默认值:请参阅上文关于出站分区支持的讨论。
producedAs
生成方式
处理器生成数据的接收组件的自定义名称。
默认值:none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下是 Kafka Streams 消费者的可用属性,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为前缀。为了方便起见,如果有多个输入绑定,并且它们都需要一个共同的值,可以通过使用前缀 spring.cloud.stream.kafka.streams.default.consumer.
来配置。
applicationId
为每个输入绑定设置 application.id
。
默认值:见上文。
keySerde
使用的键序列化/反序列化工具
默认值:请参见上述关于消息序列化/反序列化的讨论
使用的值序列化/反序列化工具
默认值:请参阅上文关于消息序列化/反序列化的讨论
materializedAs
在使用传入的 KTable 类型时进行物化的状态存储
默认值:none
。
useNativeDecoding
用于启用/禁用原生解码的标志
默认值:true
。
dlqName
DLQ 主题名称。
默认值:请参阅上面关于错误处理和死信队列(DLQ)的讨论。
startOffset
是 SVG 中的一个属性,用于指定文本在路径上的起始偏移量。它定义了文本从路径的起点开始的位置,可以是一个长度值或百分比。例如,startOffset="50%"
表示文本将从路径的中间位置开始。
如果没有已提交的偏移量可供消费,则从此偏移量开始消费。这通常在消费者首次消费某个主题时使用。Kafka Streams 使用 earliest
作为默认策略,绑定器也使用相同的默认值。可以通过此属性将其覆盖为 latest
。
默认值:earliest
。
注意:在消费者上使用 resetOffsets
对 Kafka Streams binder 没有任何影响。与基于消息通道的 binder 不同,Kafka Streams binder 不会按需定位到开始或结束位置。
反序列化异常处理器
反序列化错误处理程序类型。此处理程序按消费者绑定应用,与之前描述的绑定器级别属性不同。可能的值为 - logAndContinue
、logAndFail
、skipAndContinue
或 sendToDlq
默认值:logAndFail
timestampExtractorBeanName
在消费者端使用的特定时间戳提取器的 bean 名称。应用程序可以将 TimestampExtractor
作为 Spring bean 提供,并且可以将此 bean 的名称提供给消费者以使用,而不是使用默认的提取器。
默认值:请参阅上面关于时间戳提取器的讨论。
eventTypes
此绑定支持的事件类型的逗号分隔列表。
默认值:none
eventTypeHeaderKey
通过此绑定在每个传入记录上的事件类型头键。
默认值:event_type
consumedAs
处理器所消费的源组件的自定义名称。
默认值:none
(由 Kafka Streams 生成)
并发性特别说明
在 Kafka Streams 中,你可以使用 num.stream.threads
属性来控制处理器可以创建的线程数量。你可以通过上面描述的绑定器、函数、生产者或消费者级别的各种 configuration
选项来实现这一点。你也可以使用 Spring Cloud Stream 核心提供的 concurrency
属性来实现这一目的。在使用此属性时,你需要在消费者上使用它。当你有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,它将被绑定器转换为 num.stream.threads
。如果你有多个处理器,并且一个处理器定义了绑定级别的并发性,但其他处理器没有定义,那么那些没有绑定级别并发性的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的绑定器范围的属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。