跳到主要内容

事务性绑定器

DeepSeek V3 中英对照 Transactional Binder

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值(例如 tx-)来启用事务。当在处理器应用程序中使用时,消费者将启动事务;在消费者线程上发送的任何记录都将参与同一个事务。当监听器正常退出时,监听器容器将将偏移量发送到事务并提交它。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定都使用一个通用的生产者工厂;忽略各个绑定的 Kafka 生产者属性。

important

事务不支持普通的绑定器重试(以及死信处理),因为重试将在原始事务中运行,而原始事务可能会回滚,任何已发布的记录也会回滚。当启用重试时(公共属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor,以在容器级别启用重试。同样地,事务中不再发布死信记录,而是通过 DefaultAfterRollbackProcessor 将这一功能移至监听器容器中,该处理器在主事务回滚后运行。

如果您希望在源应用程序中使用事务,或者从某个任意线程中使用仅生产者事务(例如 @Scheduled 方法),您必须获取对事务性生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {

ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
java

请注意,我们使用 BinderFactory 获取对 binder 的引用;当只配置了一个 binder 时,第一个参数使用 null。如果配置了多个 binder,使用 binder 名称来获取引用。一旦我们获得了对 binder 的引用,就可以获取对 ProducerFactory 的引用并创建事务管理器。

然后你可以使用普通的 Spring 事务支持,例如 TransactionTemplate@Transactional,例如:

public static class Sender {

@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}

}
java

如果你希望将仅生产者的事务与其他事务管理器中的事务同步,可以使用 ChainedTransactionManager

important

如果你部署了多个应用程序实例,每个实例都需要一个唯一的 transactionIdPrefix