跳到主要内容

Apache Pulsar 的 Spring Cloud Stream Binder

DeepSeek V3 中英对照 Spring Cloud Stream Binder for Apache Pulsar

Spring for Apache Pulsar 提供了一个用于 Spring Cloud Stream 的绑定器,我们可以使用它来构建基于发布-订阅范式的事件驱动微服务。在本节中,我们将详细介绍这个绑定器的基本内容。

用法

我们需要在您的应用程序中包含以下依赖项,以便使用 Spring Cloud Stream 的 Apache Pulsar 绑定器。

<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
xml

概述

Spring Cloud Stream 的 Apache Pulsar 绑定器允许应用程序专注于业务逻辑,而无需处理管理和维护 Pulsar 的低级细节。绑定器为应用程序开发人员处理所有这些细节。Spring Cloud Stream 带来了基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。应用程序可以从中间件无关的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目标。Spring Cloud Stream 构建在 Spring Boot 之上,当您使用 Spring Cloud Stream 编写事件驱动的微服务时,实际上是在编写一个 Boot 应用程序。以下是一个简单的 Spring Cloud Stream 应用程序。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}

@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}

@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}

@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}

record Time(String time) {
}

record EnhancedTime(Time time, String extra) {
}

}
java

上述示例应用程序是一个完整的 Spring Boot 应用程序,值得做一些解释。然而,在第一次浏览时,你可以看到这仅仅是普通的 Java 和一些 Spring 和 Spring Boot 注解。我们这里有三个 Bean 方法——一个 java.util.function.Supplier、一个 java.util.function.Function,最后是一个 java.util.function.ConsumerSupplier 生成当前时间的毫秒数,Function 获取这个时间并通过添加一些随机数据来增强它,然后 Consumer 记录增强后的时间。

为了简洁起见,我们省略了所有的导入语句,但整个应用程序中没有任何特定于 Spring Cloud Stream 的内容。那么它如何成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序呢?你必须在应用程序中包含上述 binder 的依赖项。一旦添加了该依赖项,你必须提供以下配置属性。

spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
yaml

至此,上述 Spring Boot 应用程序已经成为一个基于 Spring Cloud Stream 的端到端事件驱动应用程序。由于我们在类路径中包含了 Pulsar 绑定器,因此该应用程序与 Apache Pulsar 进行交互。如果应用程序中只有一个函数,那么我们不需要告诉 Spring Cloud Stream 激活该函数以执行,因为它默认会这样做。如果应用程序中有多个这样的函数,就像我们的示例中一样,我们需要指示 Spring Cloud Stream 我们希望激活哪些函数。在我们的案例中,我们需要激活所有函数,并通过 spring.cloud.function.definition 属性来实现这一点。默认情况下,bean 名称会成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中的一个基本抽象概念,框架通过它与中间件目标进行通信。Spring Cloud Stream 所做的几乎所有事情都是通过具体的绑定进行的。供应商(supplier)只有一个输出绑定;函数有输入和输出绑定,而消费者(consumer)只有一个输入绑定。让我们以我们的供应商 bean timeSupplier 为例。这个供应商的默认绑定名称将是 timeSupplier-out-0。同样,timeProcessor 函数的默认绑定名称在输入侧将是 timeProcessor-in-0,在输出侧将是 timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。我们在绑定名称上设置目标(destination),如上所示。如果没有提供目标,绑定名称将作为目标的值,就像 timeSupplier-out-0 的情况一样。

运行上述应用程序时,您应该会看到供应商每秒执行一次,然后由函数消费,并增强了日志消费者所消费的时间。

基于 Binder 的应用程序中的消息转换

在上面的示例应用程序中,我们没有为消息转换提供任何模式信息。这是因为,默认情况下,Spring Cloud Stream 使用其消息转换机制,该机制通过 Spring Messaging 项目建立在 Spring Framework 中的消息传递支持。除非另有指定,Spring Cloud Stream 在入站和出站绑定上使用 application/json 作为 content-type 进行消息转换。在出站时,数据被序列化为 byte[],然后 Pulsar 绑定器使用 Schema.BYTES 将其通过线路发送到 Pulsar 主题。同样,在入站时,数据从 Pulsar 主题中以 byte[] 的形式消费,然后使用适当的消息转换器将其转换为目标类型。

使用 Pulsar Schema 在 Pulsar 中进行原生转换

尽管默认情况下是使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定器决定消息应该如何转换。假设应用程序选择采用这种方式,那么 Spring Cloud Stream 将避免使用任何 Spring 提供的消息转换工具,而是直接传递其接收或生成的数据。Spring Cloud Stream 中的这一特性被称为生产者端的原生编码和消费者端的原生解码。这意味着编码和解码在目标中间件上本地进行,在我们的例子中,就是在 Apache Pulsar 上。对于上述应用程序,我们可以使用以下配置来绕过框架转换,并使用原生编码和解码。

spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
yaml

在生产者端启用原生编码的属性是 Spring Cloud Stream 核心中的一个绑定级别属性。你可以在生产者绑定上设置它 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding 并将其设置为 true。同样地,对于消费者绑定,使用 spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 并将其设置为 true。如果我们决定使用原生编码和解码,在 Pulsar 的情况下,我们需要设置相应的 schema 和底层消息类型信息。这些信息作为扩展绑定属性提供。正如你在上面的配置中所看到的,属性是 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 用于 schema 信息,以及 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type 用于实际的目标类型。如果消息中同时包含键和值,你可以使用 message-key-typemessage-value-type 来指定它们的目标类型。

提示

当省略 schema-type 属性时,将参考任何已配置的自定义模式映射。

消息头转换

每条消息通常都带有头信息,这些信息需要在消息通过 Spring Cloud Stream 的输入和输出绑定在 Pulsar 和 Spring Messaging 之间传递时携带。为了支持这种传递,框架会处理必要的消息头转换。

自定义头部映射器

Pulsar 绑定器配置了一个默认的头部映射器,您可以通过提供自己的 PulsarHeaderMapper bean 来覆盖它。

在以下示例中,配置了一个 JSON 头映射器,它:

  • 映射所有入站头(除了键为“top”或“secret”的头)

  • 映射出站头(除了键为“id”、“timestamp”或“userId”的头)

  • 仅信任“com.acme”包中的对象进行出站反序列化

  • 使用简单的 toString() 编码对任何“com.acme.Money”头值进行序列化/反序列化

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
java

在 Binder 中使用 Pulsar 属性

绑定器使用 Spring for Apache Pulsar 框架中的基本组件来构建其生产者和消费者绑定。由于基于绑定器的应用程序是 Spring Boot 应用程序,因此绑定器默认使用 Spring Boot 的自动配置来支持 Spring for Apache Pulsar。因此,核心框架级别可用的所有 Pulsar Spring Boot 属性也可以通过绑定器使用。例如,您可以使用前缀为 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的属性。此外,您还可以在绑定器级别设置这些 Pulsar 属性。例如,以下方式也是有效的:spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

以上两种方法都可以,但在使用这些属性时,它们会应用于整个应用程序。如果应用程序中有多个函数,它们都会获得相同的属性。你也可以在扩展绑定属性级别设置这些 Pulsar 属性来解决这个问题。扩展绑定属性会直接应用于绑定本身。例如,如果你有一个输入绑定和一个输出绑定,并且它们都需要一组独立的 Pulsar 属性,你必须在扩展绑定上设置它们。生产者绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。同样地,消费者绑定的模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。这样,你可以在同一个应用程序中为不同的绑定应用独立的 Pulsar 属性集。

最高优先级属于扩展绑定属性。在绑定器中应用属性的优先级顺序是 扩展绑定属性 → 绑定器属性 → Spring Boot 属性(从高到低)。

以下是一些资源,可以帮助您了解更多关于通过 Pulsar 绑定器可用的属性。

Pulsar 生产者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。所有 Spring Boot 提供的 Pulsar 生产者属性 也可以通过此配置类进行配置。

Pulsar 消费者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。所有 Spring Boot 提供的 Pulsar 消费者属性 也可以通过此配置类进行配置。

有关常见的 Pulsar binder 特定配置属性,请参见此链接。这些属性需要以 spring.cloud.stream.pulsar.binder 作为前缀。上述指定的生产者和消费者属性(包括 Spring Boot 的属性)可以在 binder 中使用,前缀为 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer

Pulsar 主题配置器

Spring Cloud Stream 的 Apache Pulsar binder 提供了一个开箱即用的 Pulsar topic 配置器。当运行应用程序时,如果所需的 topic 不存在,Pulsar 会为你创建这些 topic。然而,这是一个基本的非分区 topic,如果你需要更高级的功能,比如创建一个分区 topic,你可以依赖 binder 中的 topic 配置器。Pulsar topic 配置器使用框架中的 PulsarAdministration,它使用 PulsarAdminBuilder。因此,除非你在默认的服务器和端口上运行 Pulsar,否则你需要设置 spring.pulsar.administration.service-url 属性。

创建主题时指定分区数量

在创建主题时,可以通过两种方式设置分区数量。首先,你可以在 binder 级别使用属性 spring.cloud.stream.pulsar.binder.partition-count 进行设置。正如我们上面所看到的,这样做将使应用程序创建的所有主题都继承此属性。假设你希望在绑定级别进行更细粒度的控制来设置分区。在这种情况下,你可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 为每个绑定设置 partition-count 属性。这样,同一应用程序中不同函数创建的各种主题将根据应用程序需求具有不同的分区。