Spring 数据集成历程简史
Spring 在数据集成领域的旅程始于 Spring Integration。通过其编程模型,它为构建能够采用 企业集成模式 以连接外部系统(如数据库、消息代理等)的应用程序提供了统一的开发者体验。
进入云时代,微服务在企业环境中变得日益重要。Spring Boot 彻底改变了开发者构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发独立的、生产级的基于 Spring 的微服务变得无缝衔接。
为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到一个新的项目中。于是,Spring Cloud Stream 诞生了。
使用 Spring Cloud Stream,开发者可以:
-
在隔离环境中构建、测试和部署以数据为中心的应用程序。
-
应用现代微服务架构模式,包括通过消息传递进行组合。
-
采用以事件为中心的思维方式解耦应用程序职责。一个事件可以代表在时间上发生的某件事,下游消费者应用程序可以对其做出反应,而无需知道其来源或生产者的身份。
-
将业务逻辑移植到消息代理(如 RabbitMQ、Apache Kafka、Amazon Kinesis)上。
-
依赖框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。
-
以及更多功能……
快速开始
你可以在不到 5 分钟的时间内尝试 Spring Cloud Stream,甚至在深入了解任何细节之前,只需按照这个三步指南操作即可。
我们将向你展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自你选择的消息中间件(稍后会详细介绍)的消息,并将接收到的消息记录到控制台。我们将其称为 LoggingConsumer
。虽然它并不十分实用,但它很好地介绍了一些主要概念和抽象,使你更容易理解本用户指南的其余部分。
这三个步骤如下:
使用 Spring Initializr 创建示例应用程序
要开始使用,请访问 Spring Initializr。在那里,你可以生成我们的 LoggingConsumer
应用程序。具体步骤如下:
-
在 Dependencies 部分,开始输入
stream
。当出现“Cloud Stream”选项时,选择它。 -
开始输入
kafka
或rabbit
。 -
选择 “Kafka” 或 “RabbitMQ”。
基本上,你选择的是应用程序绑定的消息中间件。我们建议使用你已经安装的或者更熟悉安装和运行的中间件。此外,从 Initializer 屏幕中可以看到,你还可以选择其他一些选项。例如,你可以选择 Gradle 作为构建工具,而不是默认的 Maven。
-
在 Artifact 字段中,输入
logging-consumer
。Artifact 字段的值将成为应用程序的名称。如果你选择了 RabbitMQ 作为中间件,你的 Spring Initializr 现在应该如下所示:
-
点击 Generate Project 按钮。
这样会将生成的项目的压缩包下载到您的硬盘上。
-
将文件解压到您想要用作项目目录的文件夹中。
我们鼓励您探索 Spring Initializr 中提供的多种可能性。它允许您创建多种不同类型的 Spring 应用程序。
将项目导入你的 IDE
现在你可以将项目导入到你的 IDE 中。请注意,根据 IDE 的不同,你可能需要遵循特定的导入流程。例如,根据项目是如何生成的(Maven 或 Gradle),你可能需要遵循特定的导入流程(例如,在 Eclipse 或 STS 中,你需要使用 File → Import → Maven → Existing Maven Project
)。
导入后,项目必须没有任何类型的错误。此外,src/main/java
目录下应包含 com.example.loggingconsumer.LoggingConsumerApplication
。
从技术上讲,此时你可以运行应用程序的主类。它已经是一个有效的 Spring Boot 应用程序。然而,它目前还没有任何功能,因此我们需要添加一些代码。
添加消息处理器、构建和运行
修改 com.example.loggingconsumer.LoggingConsumerApplication
类,使其如下所示:
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
正如你从前面的列表中可以看到:
-
我们使用函数式编程模型(参见[[Spring Cloud Function 支持]](#Spring Cloud Function 支持))将单个消息处理器定义为
Consumer
。 -
我们依赖框架约定将此处理器绑定到由 binder 暴露的输入目标绑定。
这样做还可以让你看到框架的一个核心特性:它会尝试自动将传入的消息负载转换为 Person
类型。
你现在拥有一个功能齐全的 Spring Cloud Stream 应用程序,它能够监听消息。为了简化起见,我们假设你在第一步中选择了 RabbitMQ。假设你已经安装并运行了 RabbitMQ,你可以在 IDE 中运行应用程序的 main
方法来启动它。
你应该会看到以下输出:
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
--- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
--- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
. . .
--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
. . .
--- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,并向 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
发送消息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg
部分表示组名,它是随机生成的,因此在你的环境中肯定会有所不同。为了更可预测,你可以通过设置 spring.cloud.stream.bindings.input.group=hello
(或任何你喜欢的名称)来使用明确的组名。
消息的内容应为 Person
类的 JSON 表示,如下所示:
{"name":"Sam Spade"}
然后,在你的控制台中,你应该会看到:
Received: Sam Spade
你也可以通过使用 ./mvnw clean install
将应用程序构建并打包成一个可启动的 JAR 文件,然后使用 java -jar
命令运行生成的 JAR。
现在你已经有了一个可以工作的(尽管非常基础的)Spring Cloud Stream 应用程序。
在流数据上下文中的 Spring 表达式语言 (SpEL)
在本参考手册中,您将遇到许多可以利用 Spring 表达式语言(SpEL)的功能和示例。在使用它时,了解某些限制是非常重要的。
SpEL 允许你访问当前的消息以及你正在运行的应用程序上下文。然而,理解 SpEL 可以看到什么类型的数据非常重要,特别是在处理传入消息的上下文中。从代理(broker)中,消息以字节数组(byte[]
)的形式到达。然后,绑定器(binder)将其转换为 Message<byte[]>
,此时你可以看到消息的有效载荷(payload)保持了其原始形式。消息的头部是 <String, Object>
类型,其中值通常是另一个基本类型或基本类型的集合/数组,因此是 Object
类型。这是因为绑定器不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,绑定器实际上传递了一个包含有效载荷和以消息头部形式存在的一些可读元数据的信封,就像通过邮件传递的信件一样。这意味着,虽然可以访问消息的有效载荷,但你只能以原始数据(即 byte[]
)的形式访问它。尽管开发者可能经常希望 SpEL 能够访问有效载荷对象的字段作为具体类型(例如 Foo
、Bar
等),但你可以看到实现这一点的难度甚至是不可能性。以下是一个演示该问题的例子:假设你有一个路由表达式,根据有效载荷类型路由到不同的函数。这个需求意味着需要将有效载荷从 byte[]
转换为特定类型,然后应用 SpEL。然而,为了执行这种转换,我们需要知道要传递给转换器的实际类型,而这来自于函数的签名,但我们并不知道是哪一个。解决这个需求的更好方法是将类型信息作为消息头部传递(例如 application/json;type=foo.bar.Baz
)。你将获得一个清晰可读的字符串值,可以在一年内访问和评估,并且易于阅读的 SpEL 表达式。
此外,使用有效载荷(payload)进行路由决策被认为是非常不好的做法,因为有效载荷被视为特权数据——只有其最终接收者才能读取的数据。再次以邮件递送为例,你不会希望邮递员打开你的信封并阅读信件内容来做出一些递送决定。同样的概念在这里也适用,尤其是在生成消息时相对容易包含此类信息的情况下。这种做法强制执行了一定程度的纪律,涉及通过网络传输的数据设计,以及哪些数据可以被视为公开的,哪些是特权的。