Spring Cloud Stream Schema 注册表
介绍
当组织采用基于消息的发布/订阅架构,并且多个生产者和消费者微服务相互通信时,通常需要所有这些微服务就基于模式的契约达成一致。当这种模式需要演变以适应新的业务需求时,现有组件仍需继续工作。Spring Cloud Stream 提供了对独立模式注册服务器的支持,应用程序可以注册并使用上述模式。Spring Cloud Stream 的模式注册支持还提供了基于 Avro 的模式注册客户端支持,这些客户端本质上提供了消息转换器,这些转换器与模式注册表通信,以便在消息转换期间协调模式。Spring Cloud Stream 提供的模式演变支持既适用于上述独立模式注册表,也适用于 Confluent 提供的专门与 Apache Kafka 配合使用的模式注册表。
Spring Cloud Stream Schema Registry 概述
Spring Cloud Stream Schema Registry 提供了对模式演化的支持,使得数据可以随时间演进,并且仍然能够与旧版或新版的生产者和消费者兼容,反之亦然。大多数序列化模型,特别是那些旨在实现跨不同平台和语言的可移植性的模型,都依赖于描述数据如何在二进制有效载荷中序列化的模式。为了序列化数据并随后解释它,发送和接收双方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效载荷类型或反序列化时的目标类型推断出来。然而,许多应用程序受益于能够访问描述二进制数据格式的显式模式。模式注册表允许您以文本格式(通常是 JSON)存储模式信息,并使需要它以二进制格式接收和发送数据的各种应用程序能够访问该信息。模式可以作为由以下内容组成的元组进行引用:
-
主题是模式的逻辑名称
-
模式版本
-
模式格式,描述数据的二进制格式
Spring Cloud Stream Schema Registry 提供了以下组件
-
独立模式注册表服务器
默认情况下,它使用 H2 数据库,但通过提供适当的数据源配置,服务器可以与 PostgreSQL 或 MySQL 一起使用。
-
能够通过与模式注册表通信进行消息编组的模式注册表客户端
目前,客户端可以与独立模式注册表或 Confluent 模式注册表通信。
Schema Registry 客户端
用于与模式注册表服务器交互的客户端抽象是 SchemaRegistryClient
接口,其结构如下:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了开箱即用的实现,用于与其自有的 schema 服务器以及 Confluent Schema Registry 进行交互。
可以通过使用 @EnableSchemaRegistryClient
来配置 Spring Cloud Stream 模式注册表的客户端,如下所示:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认的转换器不仅优化了远程服务器上的模式缓存,还优化了 parse()
和 toString()
方法,这些方法的开销相当大。因此,它使用了一个不缓存响应的 DefaultSchemaRegistryClient
。如果你打算改变默认行为,可以直接在代码中使用该客户端并覆盖它以达到预期效果。为此,你需要在应用程序属性中添加 spring.cloud.stream.schemaRegistryClient.cached=true
属性。
Schema Registry 客户端属性
Schema Registry 客户端支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint
schema-server 的位置。设置时,请使用完整 URL,包括协议(http
或 https
)、端口和上下文路径。
默认
[localhost:8990/](http://localhost:8990/)
spring.cloud.stream.schemaRegistryClient.cached
这是一个 Spring Cloud Stream 的配置属性,用于控制 Schema Registry 客户端的缓存行为。具体来说,spring.cloud.stream.schemaRegistryClient.cached
属性决定了是否启用 Schema Registry 客户端的缓存功能。
- 如果设置为
true
,则启用缓存,客户端会缓存从 Schema Registry 获取的 schema,以减少重复请求的次数,提高性能。 - 如果设置为
false
,则禁用缓存,每次请求 schema 时都会直接从 Schema Registry 获取。
这个属性通常在需要频繁获取 schema 的场景下使用,以减少网络开销和提高响应速度。
客户端是否应缓存 schema 服务器的响应。通常设置为 false
,因为缓存发生在消息转换器中。使用 schema 注册表客户端的客户端应将其设置为 true
。
默认
false
Avro Schema Registry 客户端消息转换器
对于在应用程序上下文中注册了 SchemaRegistryClient
bean 的应用程序,Spring Cloud Stream 会自动配置一个 Apache Avro 消息转换器以进行模式管理。这简化了模式演进的过程,因为接收消息的应用程序可以轻松访问与自己的读取模式(reader schema)相匹配的写入模式(writer schema)。
对于出站消息,如果绑定的内容类型设置为 application/*+avro
,则会激活 MessageConverter
,如下例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换过程中,消息转换器会尝试推断每个出站消息的模式(基于其类型),并通过使用 SchemaRegistryClient
将其注册到一个主题(基于有效负载类型)。如果发现一个相同的模式,则检索其引用。如果没有找到,则注册该模式,并提供一个新版本号。消息通过以下方案发送,带有 contentType
头:application/[prefix].[subject].v[version]+avro
,其中 prefix
是可配置的,subject
是从有效负载类型推导出来的。
例如,一个类型为 User
的消息可能会以二进制负载的形式发送,其内容类型为 application/vnd.user.v2+avro
,其中 user
是主题,2
是版本号。
在接收消息时,转换器会从传入消息的头部推断出模式引用,并尝试检索它。该模式在反序列化过程中被用作写入模式。
Avro Schema Registry 消息转换器属性
如果你通过设置 spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
启用了基于 Avro 的模式注册客户端,你可以通过设置以下属性来自定义注册行为。
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
如果你希望转换器使用反射从 POJO 推断出 Schema,请启用此选项。
默认值:false
spring.cloud.stream.schema.avro.readerSchema
Avro 通过查看写入模式(原始有效负载)和读取模式(你的应用程序有效负载)来比较模式版本。有关更多信息,请参阅 Avro 文档。如果设置了此选项,它将覆盖模式服务器上的任何查找,并使用本地模式作为读取模式。默认值:null
spring.cloud.stream.schema.avro.schemaLocations
将此属性中列出的任何 .avsc
文件注册到 Schema Server。
默认值:empty
spring.cloud.stream.schema.avro.prefix
Content-Type
头中使用的前缀。
默认值:vnd
spring.cloud.stream.schema.avro.subjectNamingStrategy
确定用于在模式注册表中注册 Avro 模式的主题名称。有两种实现方式可供选择,org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
,其中主题是模式名称,以及 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它使用 Avro 模式的命名空间和名称返回完全限定的主题。可以通过实现 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
来创建自定义策略。
默认值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
忽略任何模式注册表通信。这在测试时非常有用,因为当运行单元测试时,它不会不必要地尝试连接到模式注册表服务器。
默认值:false
Apache Avro 消息转换器
Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client
模块提供了对基于 schema 的消息转换器的支持。目前,唯一默认支持的基于 schema 的消息转换器序列化格式是 Apache Avro,未来版本中将添加更多格式。
spring-cloud-stream-schema-registry-client
模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:
-
使用序列化或反序列化对象的类信息或启动时已知位置的模式的转换器。
-
使用模式注册表的转换器。 它们在运行时定位模式,并随着领域对象的演变动态注册新模式。
支持模式的转换器
AvroSchemaMessageConverter
支持通过使用预定义的模式或通过类中可用的模式信息(无论是反射方式还是包含在 SpecificRecord
中)来序列化和反序列化消息。如果你提供了自定义的转换器,则不会创建默认的 AvroSchemaMessageConverter
bean。以下示例展示了一个自定义转换器:
要使用自定义转换器,您可以简单地将其添加到应用程序上下文中,并可选地指定一个或多个与之关联的 MimeTypes
。默认的 MimeType
是 application/avro
。
如果转换的目标类型是 GenericRecord
,则必须设置一个 schema。
以下示例展示了如何在接收器应用程序中通过注册 Apache Avro 的 MessageConverter
来配置转换器,而无需预定义模式。在此示例中,请注意 mime 类型值为 avro/bytes
,而不是默认的 application/avro
。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下应用程序注册了一个带有预定义模式(位于类路径上)的转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
Schema Registry 服务器
Spring Cloud Stream 提供了一个模式注册表服务器的实现。要使用它,你可以下载最新的 spring-cloud-stream-schema-registry-server
版本,并将其作为独立应用程序运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
你可以将 schema registry 嵌入到你现有的 Spring Boot Web 应用程序中。为此,将 spring-cloud-stream-schema-registry-core
工件添加到你的项目中,并使用 @EnableSchemaRegistryServer
注解,该注解会将 schema registry 服务器的 REST 控制器添加到你的应用程序中。以下示例展示了一个启用 schema registry 的 Spring Boot 应用程序:
@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}
spring.cloud.stream.schema.server.path
属性可用于控制模式服务器的根路径(尤其是在嵌入到其他应用程序中时)。spring.cloud.stream.schema.server.allowSchemaDeletion
布尔属性允许删除模式。默认情况下,此功能是禁用的。
Schema Registry 服务器使用关系型数据库来存储 schema。默认情况下,它使用嵌入式数据库。你可以通过使用 Spring Boot SQL 数据库和 JDBC 配置选项 来自定义 schema 存储。
模式注册服务器 API
Schema Registry Server API 包含以下操作:
-
POST /
— 参见[注册新 Schema](#spring-cloud-stream-overview-registering-new-schema)
-
GET /{subject}/{format}/{version}
— 参见[通过主题、格式和版本检索现有 Schema](#spring-cloud-stream-overview-retrieve-schema-subject-format-version)
-
GET /{subject}/{format}
— 参见[通过主题和格式检索现有 Schema](#spring-cloud-stream-overview-retrieve-schema-subject-format)
-
GET /schemas/{id}
— 参见[通过 ID 检索现有 Schema](#spring-cloud-stream-overview-retrieve-schema-id)
-
DELETE /{subject}/{format}/{version}
— 参见[通过主题、格式和版本删除 Schema](#spring-cloud-stream-overview-deleting-schema-subject-format-version)
-
DELETE /schemas/{id}
— 参见[通过 ID 删除 Schema](#spring-cloud-stream-overview-deleting-schema-id)
-
DELETE /{subject}
— 参见[通过主题删除 Schema](#spring-cloud-stream-overview-deleting-schema-subject)
注册新 Schema
要注册一个新的模式,请向 /
端点发送一个 POST
请求。
/
接受一个包含以下字段的 JSON 负载:
-
subject
: 模式主题 -
format
: 模式格式 -
definition
: 模式定义
它的响应是一个 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过主题、格式和版本检索现有模式
要按主题、格式和版本检索现有模式,请向 {subject}/{format}/{version}
端点发送 GET
请求。
它的响应是一个 JSON 格式的 schema 对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过主题和格式检索现有模式
要按主题和格式检索现有模式,请向 /subject/format
端点发送一个 GET
请求。
它的响应是一个包含每个模式对象的 JSON 列表,每个模式对象包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过 ID 检索现有模式
要按其 ID 检索模式,请向 /schemas/{id}
端点发送 GET
请求。
它的响应是一个 JSON 格式的 schema 对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
按主题、格式和版本删除模式
要删除由其主题、格式和版本标识的模式,请向 {subject}/{format}/{version}
端点发送 DELETE
请求。
通过 ID 删除模式
要通过 ID 删除一个 schema,请向 /schemas/{id}
端点发送一个 DELETE
请求。
按主题删除模式
DELETE /{subject}
按主题删除现有模式。
本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema
来存储 Schema
对象。Schema
是许多数据库实现中的关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 开始,我们选择了 SCHEMA_REPOSITORY
作为存储表的名称。任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户在升级前应将其现有的模式迁移到新表中。
使用 Confluent 的 Schema Registry
默认配置会创建一个 DefaultSchemaRegistryClient
的 bean。如果你想使用 Confluent 的 schema registry,你需要创建一个 ConfluentSchemaRegistryClient
类型的 bean,它会取代框架默认配置的 bean。以下示例展示了如何创建这样的 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 是针对 Confluent 平台 4.0.0 版本进行测试的。
模式注册与解析
为了更好地理解 Spring Cloud Stream 如何注册和解析新的 schema 以及其使用 Avro schema 比较功能的方式,我们提供了两个单独的子章节:
-
[Schema 注册过程(序列化)](#spring-cloud-stream-overview-schema-registration-process)
-
[Schema 解析过程(反序列化)](#spring-cloud-stream-overview-schema-resolution-process)
模式注册流程(序列化)
注册过程的第一部分是从通过通道发送的有效负载中提取模式。像 SpecificRecord
或 GenericRecord
这样的 Avro 类型已经包含了一个模式,可以立即从实例中检索到。对于 POJO,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
属性设置为 true
(默认值),则会推断出一个模式。
一旦获取到一个 schema,转换器就会从远程服务器加载其元数据(版本)。首先,它会查询本地缓存。如果没有找到结果,它会将数据提交到服务器,服务器会返回版本信息。转换器总是会将结果缓存起来,以避免对每个需要序列化的新消息都查询 Schema Server 的开销。
有了模式版本信息,转换器会将消息的 contentType
头设置为携带版本信息,例如:application/vnd.user.v1+avro
。
模式解析过程(反序列化)
当读取包含版本信息的消息时(即带有类似 [Schema Registration Process (Serialization)](#spring-cloud-stream-overview-schema-registration-process)
中描述的方案的 contentType
头信息),转换器会查询 Schema 服务器以获取消息的写入模式。一旦找到传入消息的正确模式,它会检索读取模式,并利用 Avro 的模式解析支持,将其读取到读取器定义中(设置默认值和任何缺失的属性)。
你应该理解写者模式(写入消息的应用程序)和读者模式(接收消息的应用程序)之间的区别。我们建议花点时间阅读 Avro 术语 并理解这一过程。Spring Cloud Stream 总是获取写者模式来确定如何读取消息。如果你想让 Avro 的模式演化支持正常工作,你需要确保为你的应用程序正确设置了 readerSchema
。