批处理
本节将详细介绍 Spring Cloud Task 与 Spring Batch 的集成。内容包括跟踪作业执行与执行任务之间的关联,以及通过 Spring Cloud Deployer 实现的远程分区处理。
将作业执行与执行它的任务关联
Spring Boot 提供了在 Spring Boot Uber-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持使得开发者可以在该执行中运行多个批处理作业。Spring Cloud Task 提供了将作业的执行(作业执行)与任务的执行关联起来的能力,以便可以相互追溯。
Spring Cloud Task 通过使用 TaskBatchExecutionListener
来实现这一功能。默认情况下,在任何上下文中,只要配置了 Spring Batch Job(通过在上下文中定义了一个类型为 Job
的 bean)并且 classpath 上有 spring-cloud-task-batch
jar 包,该监听器就会被自动配置。该监听器会被注入到所有满足这些条件的作业中。
重写 TaskBatchExecutionListener
为了防止监听器被注入到当前上下文中的任何批处理作业中,你可以使用标准的 Spring Boot 机制来禁用自动配置。
为了仅在上下文中的特定作业中注入监听器,请重写 batchTaskExecutionListenerBeanPostProcessor
并提供作业 Bean ID 的列表,如下例所示:
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
你可以在 Spring Cloud Task 项目的 samples 模块中找到一个批处理应用程序的示例,点击这里查看。
远程分区
Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
将工作步骤执行的启动委托给 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,您需要提供一个表示要执行的 Spring Boot Uber-jar 的 Resource
、一个 TaskLauncherHandler
和一个 JobExplorer
。您可以配置任何环境属性,以及同时执行的最大工作线程数、轮询结果的间隔(默认为 10 秒)和超时时间(默认为 -1,即无超时)。以下示例展示了如何配置这个 PartitionHandler
:
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
当向分区传递环境变量时,每个分区可能位于不同的机器上,具有不同的环境设置。因此,您应该只传递那些必需的环境变量。
请注意,在上面的示例中,我们将最大工作线程数设置为 2。设置最大工作线程数可以确定一次应运行的最大分区数。
要执行的 Resource
应是一个 Spring Boot Uber-jar,并且在当前上下文中配置了一个 DeployerStepExecutionHandler
作为 CommandLineRunner
。前面示例中列举的仓库应该是存储 Spring Boot Uber-jar 的远程仓库。管理器和工作者都需要能够访问用作作业仓库和任务仓库的同一数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler
,步骤处理器将执行请求的 Step
。以下示例展示了如何配置 DeployerStepExecutionHandler
:
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
你可以在 Spring Cloud Task 项目的示例模块中找到远程分区应用程序的示例,点击这里查看。
异步启动远程批处理分区
默认情况下,批量分区是按顺序启动的。然而,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中配置一个 pod)被配置完成。在这些情况下,你可以向 DeployerPartitionHandler
提供一个 ThreadPoolTaskExecutor
。这将根据 ThreadPoolTaskExecutor
的配置来启动远程批量分区。例如:
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor
会留下一个活跃的线程,从而导致应用程序无法终止。为了正确关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled
属性设置为 true
。
关于为 Kubernetes 平台开发批处理分区应用的注意事项
-
在 Kubernetes 平台上部署分区应用时,必须使用以下依赖项来支持 Spring Cloud Kubernetes Deployer:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
</dependency> -
任务应用及其分区的应用名称需要遵循以下正则表达式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否则,将抛出异常。
批量信息消息
Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch 事件”部分详细介绍了这一功能。
批处理作业退出代码
正如之前所讨论的,Spring Cloud Task 应用程序支持记录任务执行的退出代码。然而,当你在任务中运行一个 Spring Batch Job 时,无论 Batch Job Execution 如何完成,使用默认的 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是一个 boot 应用程序,任务返回的退出代码与 boot 应用程序相同。要覆盖此行为并允许任务在批处理作业返回 FAILED
的 BatchStatus 时返回非零退出代码,请将 spring.cloud.task.batch.fail-on-job-failure
设置为 true
。然后退出代码可以是 1(默认值)或基于 指定的 ExitCodeGenerator。
此功能使用了一个新的 ApplicationRunner
,它替代了 Spring Boot 提供的那个。默认情况下,它的配置顺序是相同的。然而,如果你想自定义 ApplicationRunner
的运行顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder
属性来调整其顺序。为了根据批处理作业的执行结果返回退出码,你需要编写自己的 CommandLineRunner
。