交互式查询
Kafka Streams 绑定器 API 提供了一个名为 InteractiveQueryService
的类,用于交互式查询状态存储。你可以在应用程序中将其作为 Spring bean 进行访问。从应用程序中访问该 bean 的一种简单方法是使用 autowire
自动装配该 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦你获得了这个 bean 的访问权限,就可以查询你感兴趣的特定状态存储。请参见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动过程中,上述获取存储的方法调用可能会失败。例如,状态存储可能仍在初始化过程中。在这种情况下,重试此操作将非常有用。Kafka Streams 绑定器提供了一个简单的重试机制来应对这种情况。
以下是可用于控制重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 Kafka Streams 应用程序实例在运行,那么在交互式查询之前,您需要确定哪个应用程序实例托管了您正在查询的特定键。InteractiveQueryService
API 提供了用于识别主机信息的方法。
为了使此功能正常工作,您必须按照以下方式配置 application.server
属性:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参阅这些方法的 Javadoc。对于这些方法,在启动期间,如果底层的 KafkaStreams 对象尚未准备好,它们可能会抛出异常。上述重试属性也适用于这些方法。
通过 InteractiveQueryService 提供的其他 API 方法
使用以下 API 方法来检索与给定存储和键组合相关联的 KeyQueryMetadata
对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索与给定存储和键组合相关联的 KakfaStreams
对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
自定义 Store 查询参数
有时,在通过 InteractiveQueryService
查询存储之前,您需要微调存储查询参数。为此,从绑定器的 4.0.1
版本开始,您可以提供一个 StoreQueryParametersCustomizer
的 bean,这是一个函数式接口,带有一个 customize
方法,该方法接受 StoreQueryParameter
作为参数。以下是其方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
通过这种方法,应用程序可以进一步自定义 StoreQueryParameters
,例如启用陈旧存储。
当此 bean 存在于应用程序中时,InteractiveQueryService
将在查询状态存储之前调用其 customize
方法。
请记住,应用程序中必须有一个唯一的 StoreQueryParametersCustomizer
bean。