Spring Cloud Stream 集成
单个任务本身可能很有用,但将任务集成到一个更大的生态系统中,可以使其在更复杂的处理和编排中发挥作用。本节将介绍 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
从 Spring Cloud Stream 启动任务
你可以从流中启动任务。为此,创建一个接收器,监听包含 TaskLaunchRequest
作为其负载的消息。TaskLaunchRequest
包含以下内容:
-
uri
: 指向要执行的任务工件。 -
applicationName
: 与任务关联的名称。如果未设置applicationName
,TaskLaunchRequest
将生成一个由以下内容组成的任务名称:Task-<UUID>
。 -
commandLineArguments
: 包含任务命令行参数的列表。 -
environmentProperties
: 包含任务使用的环境变量的映射。 -
deploymentProperties
: 包含部署程序用于部署任务的属性的映射。
如果负载类型不同,接收器会抛出异常。
例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源接收数据并创建一个包含 TaskLaunchRequest
的 GenericMessage
,然后将消息发送到其输出通道。然后,任务接收器将从其输入通道接收消息并启动任务。
要创建一个 taskSink
,你只需要创建一个包含 EnableTaskLauncher
注解的 Spring Boot 应用程序,如下例所示:
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples 模块 包含了一个示例的 Sink 和 Processor。要将这些示例安装到本地 Maven 仓库中,请在 spring-cloud-task-samples
目录下运行 Maven 构建,并将 skipInstall
属性设置为 false
,如下例所示:
mvn clean install
maven.remoteRepositories.springRepo.url
属性必须设置为包含 Spring Boot Uber-jar 的远程仓库的位置。如果未设置,则表示没有远程仓库,因此仅依赖本地仓库。
Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中创建一个流,首先必须注册我们创建的 Task Sink 应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示了如何从 Spring Cloud Data Flow shell 创建一个流:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task 事件
Spring Cloud Task 提供了通过 Spring Cloud Stream 通道在任务运行时发出事件的能力。任务监听器用于在名为 task-events
的消息通道上发布 TaskExecution
。此功能会自动注入到任何在其类路径上具有 spring-cloud-stream
、spring-cloud-stream-<binder>
和定义任务的类中。
要禁用事件发射监听器,请将 spring.cloud.task.events.enabled
属性设置为 false
。
在定义了适当的类路径后,以下任务会在 task-events
通道上发出 TaskExecution
事件(在任务的开始和结束时都会发出):
@Bean
public IntegrationFlow taskFlow() {
return IntegrationFlows.from("task-events")
.handle(System.out::println)
.get();
}
@Bean
public Task task() {
return new Task() {
@Override
public void execute(TaskExecution taskExecution) {
System.out.println("Executing task: " + taskExecution.getTaskName());
}
};
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
@Bean
public TaskScheduler taskScheduler() {
return new ConcurrentTaskScheduler();
}
@Bean
public TaskExecutionListener taskExecutionListener() {
return new TaskExecutionListener() {
@Override
public void onStart(TaskExecution taskExecution) {
System.out.println("Task started: " + taskExecution.getTaskName());
}
@Override
public void onEnd(TaskExecution taskExecution) {
System.out.println("Task ended: " + taskExecution.getTaskName());
}
};
}
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
还需要在类路径上提供一个 binder 实现。
一个示例任务事件应用程序可以在 Spring Cloud Task 项目的示例模块中找到,点击这里查看。
禁用特定任务事件
要禁用任务事件,可以将 spring.cloud.task.events.enabled
属性设置为 false
。
Spring Batch 事件
当通过任务执行 Spring Batch 作业时,可以配置 Spring Cloud Task 以根据 Spring Batch 中可用的 Spring Batch 监听器发出信息消息。具体来说,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在通过 Spring Cloud Task 运行时在相关的 Spring Cloud Stream 通道上发出消息:
-
JobExecutionListener
监听job-execution-events
-
StepExecutionListener
监听step-execution-events
-
ChunkListener
监听chunk-events
-
ItemReadListener
监听item-read-events
-
ItemProcessListener
监听item-process-events
-
ItemWriteListener
监听item-write-events
-
SkipListener
监听skip-events
当上下文中存在适当的 bean(一个 Job
和一个 TaskLifecycleListener
)时,这些监听器会自动配置到任何 AbstractJob
中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source
,而监听应用程序则充当 Processor
或 Sink
。
一个例子可以是让一个应用程序监听 job-execution-events
通道,以获取作业的启动和停止事件。要配置监听应用程序,你可以将输入配置为 job-execution-events
,如下所示:
spring.cloud.stream.bindings.input.destination=job-execution-events
还需要在类路径上有一个 binder 实现。
一个示例的批量事件应用程序可以在 Spring Cloud Task 项目的示例模块中找到,点击这里查看。
向不同频道发送批量事件
Spring Cloud Task 为批处理事件提供的一个选项是能够更改特定监听器可以发送消息的通道。为此,可以使用以下配置:spring.cloud.stream.bindings.<通道名称>.destination=<新目标>
。例如,如果 StepExecutionListener
需要将其消息发送到名为 my-step-execution-events
的通道,而不是默认的 step-execution-events
,你可以添加以下配置:
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
禁用批量事件
要禁用所有批处理事件的监听器功能,请使用以下配置:
spring.cloud.task.batch.events.enabled=false
要禁用特定的批处理事件,请使用以下配置:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示了你可以禁用的各个监听器:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批量事件的发射顺序
默认情况下,批处理事件的 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改为 5),请使用以下配置:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5