跳到主要内容

Kafka Streams 绑定器中的绑定可视化和控制

DeepSeek V3 中英对照 Binding visualization and control in Kafka Streams binder

从 3.1.2 版本开始,Kafka Streams 绑定器支持绑定可视化和控制。仅支持两种生命周期阶段:STOPPEDSTARTED。Kafka Streams 绑定器中不提供 PAUSEDRESUMED 生命周期阶段。

为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
none

如果你更倾向于使用 WebFlux,那么你可以包含 spring-boot-starter-webflux 而不是标准的 Web 依赖。

此外,您还需要设置以下属性:

management.endpoints.web.exposure.include=bindings
none

为了进一步说明这一特性,让我们使用以下应用程序作为指南:

@SpringBootApplication
public class KafkaStreamsApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}

@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}

}
none

正如我们所见,该应用有两个 Kafka Streams 函数——一个是消费者,另一个是函数。消费者的绑定默认命名为 consumer-in-0。同样,对于函数,输入绑定为 function-in-0,输出绑定为 function-out-0

应用程序启动后,我们可以使用以下绑定端点来查找有关绑定的详细信息。

curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
none

关于所有三种绑定的详细信息可以在上面找到。

现在让我们停止 consumer-in-0 绑定。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
none

此时,不会通过此绑定接收到任何记录。

重新启动绑定。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
none

当一个函数上存在多个绑定时,在任何这些绑定上调用这些操作都将有效。这是因为单个函数上的所有绑定都由同一个 StreamsBuilderFactoryBean 支持。因此,对于上述函数,无论是 function-in-0 还是 function-out-0 都可以正常工作。