跳到主要内容

Spring Cloud Stream 集成

DeepSeek V3 中英对照 Spring Cloud Stream Integration

单个任务本身可能很有用,但将任务集成到一个更大的生态系统中,可以使其在更复杂的处理和编排中发挥作用。本节将介绍 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

从 Spring Cloud Stream 启动任务

你可以从流中启动任务。为此,创建一个接收器,监听包含 TaskLaunchRequest 作为其负载的消息。TaskLaunchRequest 包含以下内容:

  • uri: 指向要执行的任务工件。

  • applicationName: 与任务关联的名称。如果未设置 applicationNameTaskLaunchRequest 将生成一个由以下内容组成的任务名称:Task-<UUID>

  • commandLineArguments: 包含任务命令行参数的列表。

  • environmentProperties: 包含任务使用的环境变量的映射。

  • deploymentProperties: 包含部署程序用于部署任务的属性的映射。

备注

如果负载类型不同,接收器会抛出异常。

例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源接收数据并创建一个包含 TaskLaunchRequestGenericMessage,然后将消息发送到其输出通道。然后,任务接收器将从其输入通道接收消息并启动任务。

要创建一个 taskSink,你只需要创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用程序,如下例所示:

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
java

Spring Cloud Task 项目的 samples 模块 包含了一个示例的 Sink 和 Processor。要将这些示例安装到本地 Maven 仓库中,请在 spring-cloud-task-samples 目录下运行 Maven 构建,并将 skipInstall 属性设置为 false,如下例所示:

mvn clean install

备注

maven.remoteRepositories.springRepo.url 属性必须设置为包含 Spring Boot Uber-jar 的远程仓库的位置。如果未设置,则表示没有远程仓库,因此仅依赖本地仓库。

Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建一个流,首先必须注册我们创建的 Task Sink 应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
bash

以下示例展示了如何从 Spring Cloud Data Flow shell 创建一个流:

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
bash

Spring Cloud Task 事件

Spring Cloud Task 提供了通过 Spring Cloud Stream 通道在任务运行时发出事件的能力。任务监听器用于在名为 task-events 的消息通道上发布 TaskExecution。此功能会自动注入到任何在其类路径上具有 spring-cloud-streamspring-cloud-stream-<binder> 和定义任务的类中。

备注

要禁用事件发射监听器,请将 spring.cloud.task.events.enabled 属性设置为 false

在定义了适当的类路径后,以下任务会在 task-events 通道上发出 TaskExecution 事件(在任务的开始和结束时都会发出):

@Bean
public IntegrationFlow taskFlow() {
return IntegrationFlows.from("task-events")
.handle(System.out::println)
.get();
}
java
@Bean
public Task task() {
return new Task() {
@Override
public void execute(TaskExecution taskExecution) {
System.out.println("Executing task: " + taskExecution.getTaskName());
}
};
}
java
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
java
@Bean
public TaskScheduler taskScheduler() {
return new ConcurrentTaskScheduler();
}
java
@Bean
public TaskExecutionListener taskExecutionListener() {
return new TaskExecutionListener() {
@Override
public void onStart(TaskExecution taskExecution) {
System.out.println("Task started: " + taskExecution.getTaskName());
}

@Override
public void onEnd(TaskExecution taskExecution) {
System.out.println("Task ended: " + taskExecution.getTaskName());
}
};
}
java
@SpringBootApplication
public class TaskEventsApplication {

public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}

@Configuration
public static class TaskConfiguration {

@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
java
备注

还需要在类路径上提供一个 binder 实现。

备注

一个示例任务事件应用程序可以在 Spring Cloud Task 项目的示例模块中找到,点击这里查看。

禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

Spring Batch 事件

当通过任务执行 Spring Batch 作业时,可以配置 Spring Cloud Task 以根据 Spring Batch 中可用的 Spring Batch 监听器发出信息消息。具体来说,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在通过 Spring Cloud Task 运行时在相关的 Spring Cloud Stream 通道上发出消息:

  • JobExecutionListener 监听 job-execution-events

  • StepExecutionListener 监听 step-execution-events

  • ChunkListener 监听 chunk-events

  • ItemReadListener 监听 item-read-events

  • ItemProcessListener 监听 item-process-events

  • ItemWriteListener 监听 item-write-events

  • SkipListener 监听 skip-events

当上下文中存在适当的 bean(一个 Job 和一个 TaskLifecycleListener)时,这些监听器会自动配置到任何 AbstractJob 中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序则充当 ProcessorSink

一个例子可以是让一个应用程序监听 job-execution-events 通道,以获取作业的启动和停止事件。要配置监听应用程序,你可以将输入配置为 job-execution-events,如下所示:

spring.cloud.stream.bindings.input.destination=job-execution-events

备注

还需要在类路径上有一个 binder 实现。

备注

一个示例的批量事件应用程序可以在 Spring Cloud Task 项目的示例模块中找到,点击这里查看。

向不同频道发送批量事件

Spring Cloud Task 为批处理事件提供的一个选项是能够更改特定监听器可以发送消息的通道。为此,可以使用以下配置:spring.cloud.stream.bindings.<通道名称>.destination=<新目标>。例如,如果 StepExecutionListener 需要将其消息发送到名为 my-step-execution-events 的通道,而不是默认的 step-execution-events,你可以添加以下配置:

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

禁用批量事件

要禁用所有批处理事件的监听器功能,请使用以下配置:

spring.cloud.task.batch.events.enabled=false

要禁用特定的批处理事件,请使用以下配置:

spring.cloud.task.batch.events.<batch event listener>.enabled=false:
spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了你可以禁用的各个监听器:

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
bash

批量事件的发射顺序

默认情况下,批处理事件的 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置:

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
bash