单步批处理作业启动器
本节将介绍如何使用 Spring Cloud Task 中包含的启动器来开发一个具有单个 Step
的 Spring Batch Job
。该启动器允许你通过配置来定义一个 ItemReader
、一个 ItemWriter
,或者一个完整的单步骤 Spring Batch Job
。有关 Spring Batch 及其功能的更多信息,请参阅 Spring Batch 文档。
要获取 Maven 的启动器,请将以下内容添加到你的构建中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要获取 Gradle 的启动器,请将以下内容添加到你的构建文件中:
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
定义任务
你可以使用启动器来定义少至一个 ItemReader
或 ItemWriter
,或者多至一个完整的 Job
。在本节中,我们将定义配置一个 Job
所需定义的必要属性。
属性
首先,starter 提供了一组属性,允许您配置一个包含单个 Step 的 Job 的基本设置:
表 1. 作业属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.jobName | String | null | 任务的名称。 |
spring.batch.job.stepName | String | null | 步骤的名称。 |
spring.batch.job.chunkSize | Integer | null | 每个事务中要处理的项数。 |
配置了上述属性后,您就拥有了一个基于块(chunk)的步骤的作业。这个基于块的步骤读取、处理和写入 Map<String, Object>
实例作为项。然而,这个步骤目前还没有做任何事情。您需要配置一个 ItemReader
、一个可选的 ItemProcessor
和一个 ItemWriter
来让它执行一些操作。要配置这些组件,您可以使用属性并配置其中一个提供了自动配置的选项,或者您可以使用标准的 Spring 配置机制来配置自己的组件。
如果你配置自己的实现,输入和输出的类型必须与步骤中的其他实现匹配。此启动器中的 ItemReader
实现和 ItemWriter
实现都使用 Map<String, Object>
作为输入和输出项。
ItemReader 实现的自动配置
这个启动器为四种不同的 ItemReader
实现提供了自动配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。在本节中,我们将概述如何使用提供的自动配置来配置这些实现。
AmqpItemReader
你可以使用 AmqpItemReader
从 AMQP 的队列或主题中读取数据。这个 ItemReader
实现的自动配置依赖于两组配置。第一组是 AmqpTemplate
的配置。你可以自己配置它,或者使用 Spring Boot 提供的自动配置。请参阅 Spring Boot AMQP 文档。一旦你配置好了 AmqpTemplate
,你可以通过设置以下属性来启用批处理功能以支持它:
表 2. AmqpItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.amqpitemreader.enabled | boolean | false | 如果为 true ,则自动配置将执行。 |
spring.batch.job.amqpitemreader.jsonConverterEnabled | boolean | true | 指示是否应注册 Jackson2JsonMessageConverter 以解析消息。 |
更多信息,请参阅 AmqpItemReader 文档。
FlatFileItemReader
FlatFileItemReader
允许您从平面文件(如 CSV 和其他文件格式)中读取数据。要从文件中读取数据,您可以通过普通的 Spring 配置提供一些组件(如 LineTokenizer
、RecordSeparatorPolicy
、FieldSetMapper
、LineMapper
或 SkippedLinesCallback
)。您还可以使用以下属性来配置读取器:
表 3. FlatFileItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.flatfileitemreader.saveState | boolean | true | 确定是否应在重启时保存状态。 |
spring.batch.job.flatfileitemreader.name | String | null | 用于在 ExecutionContext 中提供唯一键的名称。 |
spring.batch.job.flatfileitemreader.maxItemcount | int | Integer.MAX_VALUE | 从文件中读取的最大项目数。 |
spring.batch.job.flatfileitemreader.currentItemCount | int | 0 | 已读取的项目数。用于重启时。 |
spring.batch.job.flatfileitemreader.comments | List<String> | 空列表 | 指示文件中注释行(要忽略的行)的字符串列表。 |
spring.batch.job.flatfileitemreader.resource | Resource | null | 要读取的资源。 |
spring.batch.job.flatfileitemreader.strict | boolean | true | 如果设置为 true ,当资源未找到时,读取器会抛出异常。 |
spring.batch.job.flatfileitemreader.encoding | String | FlatFileItemReader.DEFAULT_CHARSET | 读取文件时使用的编码。 |
spring.batch.job.flatfileitemreader.linesToSkip | int | 0 | 指示在文件开头跳过的行数。 |
spring.batch.job.flatfileitemreader.delimited | boolean | false | 指示文件是否为分隔文件(CSV 和其他格式)。此属性与 spring.batch.job.flatfileitemreader.fixedLength 只能有一个同时为 true 。 |
spring.batch.job.flatfileitemreader.delimiter | String | DelimitedLineTokenizer.DELIMITER_COMMA | 如果读取分隔文件,指示要解析的分隔符。 |
spring.batch.job.flatfileitemreader.quoteCharacter | char | DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER | 用于确定引用值的字符。 |
spring.batch.job.flatfileitemreader.includedFields | List<Integer> | 空列表 | 确定记录中哪些字段应包含在项目中的索引列表。 |
spring.batch.job.flatfileitemreader.fixedLength | boolean | false | 指示是否通过列号解析文件的记录。此属性与 spring.batch.job.flatfileitemreader.delimited 只能有一个同时为 true 。 |
spring.batch.job.flatfileitemreader.ranges | List<Range> | 空列表 | 用于解析固定宽度记录的列范围列表。参见 Range 文档。 |
spring.batch.job.flatfileitemreader.names | String [] | null | 从记录中解析的每个字段的名称列表。这些名称是从此 ItemReader 返回的项目中 Map<String, Object> 的键。 |
spring.batch.job.flatfileitemreader.parsingStrict | boolean | true | 如果设置为 true ,当字段无法映射时,映射将失败。 |
JdbcCursorItemReader
JdbcCursorItemReader
针对关系型数据库运行查询,并遍历结果游标(ResultSet
)以提供结果项。此自动配置允许你提供 PreparedStatementSetter
、RowMapper
或两者。你还可以使用以下属性来配置 JdbcCursorItemReader
:
表 4. JdbcCursorItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.jdbccursoritemreader.saveState | boolean | true | 确定是否应为重启保存状态。 |
spring.batch.job.jdbccursoritemreader.name | String | null | 用于在 ExecutionContext 中提供唯一键的名称。 |
spring.batch.job.jdbccursoritemreader.maxItemcount | int | Integer.MAX_VALUE | 从文件中读取的最大项目数。 |
spring.batch.job.jdbccursoritemreader.currentItemCount | int | 0 | 已读取的项目数。用于重启时。 |
spring.batch.job.jdbccursoritemreader.fetchSize | int | 向驱动程序提示每次调用数据库系统时检索多少条记录。为了获得最佳性能,通常希望将其设置为与块大小匹配。 | |
spring.batch.job.jdbccursoritemreader.maxRows | int | 从数据库中读取的最大项目数。 | |
spring.batch.job.jdbccursoritemreader.queryTimeout | int | 查询超时的毫秒数。 | |
spring.batch.job.jdbccursoritemreader.ignoreWarnings | boolean | true | 确定读取器在处理时是否应忽略 SQL 警告。 |
spring.batch.job.jdbccursoritemreader.verifyCursorPosition | boolean | true | 指示每次读取后是否应验证游标的位置,以验证 RowMapper 是否没有推进游标。 |
spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute | boolean | false | 指示驱动程序是否支持游标的绝对定位。 |
spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection | boolean | false | 指示连接是否与其他处理共享(因此是事务的一部分)。 |
spring.batch.job.jdbccursoritemreader.sql | String | null | 从中读取的 SQL 查询。 |
你也可以通过以下属性专门为读取器指定 JDBC 数据源:.JdbcCursorItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.jdbccursoritemreader.datasource.enable | boolean | false | 决定是否启用 JdbcCursorItemReader 的 DataSource 。 |
jdbccursoritemreader.datasource.url | String | null | 数据库的 JDBC URL。 |
jdbccursoritemreader.datasource.username | String | null | 数据库的登录用户名。 |
jdbccursoritemreader.datasource.password | String | null | 数据库的登录密码。 |
jdbccursoritemreader.datasource.driver-class-name | String | null | JDBC 驱动程序的完全限定名称。 |
如果未指定 jdbccursoritemreader_datasource
,JDBCCursorItemReader
将使用默认的 DataSource
。
KafkaItemReader
从 Kafka 主题中摄取数据分区是非常有用的,而这正是 KafkaItemReader
能够实现的功能。要配置 KafkaItemReader
,需要两个配置步骤。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(参见 Spring Boot Kafka 文档)。一旦你通过 Spring Boot 配置了 Kafka 属性,就可以通过设置以下属性来配置 KafkaItemReader
本身:
表 5. KafkaItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.kafkaitemreader.name | String | null | 用于在 ExecutionContext 中提供唯一键的名称。 |
spring.batch.job.kafkaitemreader.topic | String | null | 从中读取数据的主题名称。 |
spring.batch.job.kafkaitemreader.partitions | List<Integer> | 空列表 | 从中读取数据的分区索引列表。 |
spring.batch.job.kafkaitemreader.pollTimeOutInSeconds | long | 30 | poll() 操作的超时时间。 |
spring.batch.job.kafkaitemreader.saveState | boolean | true | 确定是否应为重启保存状态。 |
请参阅 KafkaItemReader 文档。
原生编译
单步批处理的优势在于,当你在 JVM 上运行时,它允许你动态选择要使用的读取器和写入器 Bean。然而,当你使用原生编译时,你必须在构建时而不是运行时确定读取器和写入器。以下示例展示了如何实现这一点:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
ItemProcessor 配置
单步批处理作业的自动配置会接受 ApplicationContext
中可用的 ItemProcessor
。如果找到了正确类型的 ItemProcessor
(即 ItemProcessor<Map<String, Object>, Map<String, Object>>
),它会自动装配到步骤中。
ItemWriter 实现的自动配置
这个入门指南为与支持的 ItemReader
实现相匹配的 ItemWriter
实现提供了自动配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。本节将介绍如何使用自动配置来配置一个受支持的 ItemWriter
。
AmqpItemWriter
要向 RabbitMQ 队列写入数据,您需要两组配置。首先,您需要一个 AmqpTemplate
。获取它的最简单方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅 Spring Boot AMQP 文档。
在配置好 AmqpTemplate
之后,您可以通过设置以下属性来配置 AmqpItemWriter
:
表 6. AmqpItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.amqpitemwriter.enabled | boolean | false | 如果为 true ,则自动配置运行。 |
spring.batch.job.amqpitemwriter.jsonConverterEnabled | boolean | true | 指示是否应注册 Jackson2JsonMessageConverter 以转换消息。 |
FlatFileItemWriter
要将文件作为步骤的输出写入,你可以配置 FlatFileItemWriter
。自动配置接受已显式配置的组件(例如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及通过设置以下指定属性配置的组件:
表 7. FlatFileItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.flatfileitemwriter.resource | Resource | null | 要读取的资源。 |
spring.batch.job.flatfileitemwriter.delimited | boolean | false | 指示输出文件是否为分隔文件。如果为 true ,则 spring.batch.job.flatfileitemwriter.formatted 必须为 false 。 |
spring.batch.job.flatfileitemwriter.formatted | boolean | false | 指示输出文件是否为格式化文件。如果为 true ,则 spring.batch.job.flatfileitemwriter.delimited 必须为 false 。 |
spring.batch.job.flatfileitemwriter.format | String | null | 用于生成格式化文件输出的格式。格式化是通过 String.format 执行的。 |
spring.batch.job.flatfileitemwriter.locale | Locale | Locale.getDefault() | 生成文件时要使用的 Locale 。 |
spring.batch.job.flatfileitemwriter.maximumLength | int | 0 | 记录的最大长度。如果为 0,则长度无限制。 |
spring.batch.job.flatfileitemwriter.minimumLength | int | 0 | 记录的最小长度。 |
spring.batch.job.flatfileitemwriter.delimiter | String | , | 用于在分隔文件中分隔字段的 String 。 |
spring.batch.job.flatfileitemwriter.encoding | String | FlatFileItemReader.DEFAULT_CHARSET | 写入文件时使用的编码。 |
spring.batch.job.flatfileitemwriter.forceSync | boolean | false | 指示在刷新时是否应将文件强制同步到磁盘。 |
spring.batch.job.flatfileitemwriter.names | String [] | null | 从记录中解析的每个字段的名称列表。这些名称是此 ItemWriter 接收的项目的 Map<String, Object> 中的键。 |
spring.batch.job.flatfileitemwriter.append | boolean | false | 指示如果找到输出文件,是否应追加到文件中。 |
spring.batch.job.flatfileitemwriter.lineSeparator | String | FlatFileItemWriter.DEFAULT_LINE_SEPARATOR | 用于在输出文件中分隔行的 String 。 |
spring.batch.job.flatfileitemwriter.name | String | null | 用于在 ExecutionContext 中提供唯一键的名称。 |
spring.batch.job.flatfileitemwriter.saveState | boolean | true | 确定是否应为重启保存状态。 |
spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty | boolean | false | 如果设置为 true ,则在作业完成时删除空文件(没有输出)。 |
spring.batch.job.flatfileitemwriter.shouldDeleteIfExists | boolean | true | 如果设置为 true 并且在输出文件应存在的位置找到文件,则在步骤开始之前删除该文件。 |
spring.batch.job.flatfileitemwriter.transactional | boolean | FlatFileItemWriter.DEFAULT_TRANSACTIONAL | 指示读取器是否为事务性队列(指示在失败时将读取的项目返回到队列)。 |
JdbcBatchItemWriter
为了将步骤的输出写入关系型数据库,这个入门工具提供了自动配置 JdbcBatchItemWriter
的能力。自动配置允许你通过设置以下属性来提供自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
以及配置选项:
表 8. JdbcBatchItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.jdbcbatchitemwriter.name | String | null | 用于在 ExecutionContext 中提供唯一键的名称。 |
spring.batch.job.jdbcbatchitemwriter.sql | String | null | 用于插入每个项的 SQL。 |
spring.batch.job.jdbcbatchitemwriter.assertUpdates | boolean | true | 是否验证每次插入至少更新一条记录。 |
你也可以通过以下属性为 writer 专门指定 JDBC 数据源:.JdbcBatchItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.jdbcbatchitemwriter.datasource.enable | boolean | false | 确定是否启用 JdbcCursorItemReader 的 DataSource 。 |
jdbcbatchitemwriter.datasource.url | String | null | 数据库的 JDBC URL。 |
jdbcbatchitemwriter.datasource.username | String | null | 数据库的登录用户名。 |
jdbcbatchitemwriter.datasource.password | String | null | 数据库的登录密码。 |
jdbcbatchitemreader.datasource.driver-class-name | String | null | JDBC 驱动程序的完全限定名称。 |
如果未指定 jdbcbatchitemwriter_datasource
,则 JdbcBatchItemWriter
将使用默认的 DataSource
。
KafkaItemWriter
要将步骤输出写入 Kafka 主题,你需要使用 KafkaItemWriter
。该启动器通过使用两个地方的设施为 KafkaItemWriter
提供了自动配置。第一个是 Spring Boot 的 Kafka 自动配置。(请参阅 Spring Boot Kafka 文档。)其次,该启动器允许你在写入器上配置两个属性。
表 9. KafkaItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
spring.batch.job.kafkaitemwriter.topic | String | null | 要写入的 Kafka 主题。 |
spring.batch.job.kafkaitemwriter.delete | boolean | false | 传递给写入器的所有项是否都应作为删除事件发送到主题。 |
有关 KafkaItemWriter
的更多配置选项,请参阅 KafkaItemWriter 文档。
Spring AOT
在使用 Spring AOT 与 Single Step Batch Starter 时,你必须在编译时设置 reader 和 writer 的名称属性(除非你为 reader 和/或 writer 创建了 bean)。为此,你需要在 boot maven 插件或 gradle 插件中包含你希望使用的 reader 和 writer 的名称作为参数或环境变量。例如,如果你希望在 Maven 中启用 FlatFileItemReader
和 FlatFileItemWriter
,它看起来会像这样:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>