当前位置 : 主页 > 编程语言 > java >

Spring事务事件控制,解决业务异步操作

来源:互联网 收集:自由互联 发布时间:2023-02-04
背景 在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。   例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码

背景

在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。

 

例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码,但是在用户保存时出现提交事务异常,数据库进行回滚,用户实际没有注册成功,但是用户却收到了激活码,此时,正确的是应该在用户注册保存事务提交完成之后,然后发送激活码。

解决方案

1、使用注解@TransactionalEventListener

@Service @Transactional public class FooService { @Autowired private ApplicationEventPublisher applicationEventPublisher; public User saveUser(User user) { userDao.save(user); // 注册事件 applicationEventPublisher.publishEvent(new SavedUserEvent(user.getId())); } } // ------------------------------------- /** * 保存用户事件 */ public class SavedUserEvent { private int userId; public SavedUserEvent(int userId) { this.userId = userId; } // getter and setter } /** * 事件侦听,处理对应事件 */ @Component public class FooEventListener() { @Autowired private UserDao userDao; @Autowired private MailService mailService; @TransactionalEventListener public sendEmail(SavedUserEvent savedUserEvent) { User user = userDao.get(userId); String email = user.getEmail(); mailService.send(email); } }

publishEvent 底层调用了一个SimpleApplicationEventMulticaster 来发布事件,属性有一个Executor 可以用来设置异步的方式。

1.1 设置线程池

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class Config { @Bean public ThreadPoolTaskExecutor executor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3);//核心线程数 executor.setMaxPoolSize(10);//最大线程数 executor.setQueueCapacity(100);//队列大小 return executor; } @Bean public SimpleApplicationEventMulticaster applicationEventMulticaster(ThreadPoolTaskExecutor executor){ SimpleApplicationEventMulticaster multicaste = new SimpleApplicationEventMulticaster(); multicaste.setTaskExecutor(executor); return multicaste; } }

1.2 发布事件后测试

  • 有2个线程来发布事件

2、使用TransactionSynchronizationManager 和 TransactionSynchronizationAdapter

@Autowired private UserDao userDao; @Autowired private JmsProducer jmsProducer; public User saveUser(User user) { // 保存用户 userDao.save(user); final int userId = user.getId(); // 兼容无论是否有事务 if(TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { jmsProducer.sendEmail(userId); } }); } else { jmsProducer.sendEmail(userId); } }

如果saveUser()和sendEmail()这两个方法使用了相同的事务,但是需要注意的是sendEmail()方法是在afterCommit事务提交之后执行的,此时会导致sendEmail()中的JPA数据保存最终无法提交。所以我们需要使sendEmail()进入一个新的事务中。

 

如果afterCommit()方法中执行的方法也包含事务,在该方法的@Transactional注解中使用propagation参数用来控制事务的传播,其默认被设置为Propagation.REQUIRED

2.1 解决办法

在@Transactional注解中propagation参数用来控制事务的传播。其默认被设置为Propagation.REQUIRED

 

Propagation.REQUIRED其逻辑是,如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。而上面的业务中我们并不希望其加入已有的事务中,所以单介绍上面的逻辑,假如希望JPA的数据保存到数据库中,需要在事务注解修改为@Transactional(propagation = Propagation.REQUIRES_NEW)参数

 

然而在很多时候我们希望新加入的方法能够被同一个事务所管理,而使用Propagation.REQUIRES_NEW会导致当前操作脱离上一级事务的控制。所以在使用@Transactional(propagation = Propagation.REQUIRES_NEW)的时候一定要慎重,并且严格控制其被滥用。

2.2 存在的问题

TransactionSynchronizationAdapter的事务提交afterCommit方法后执行,虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。

 

让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。

  • 1、构建注解
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface PostCommit { }
  • 2、构建 PostCommitAdapter

PostCommitAdapter 将提供两个功能

  • 它将注册runnables到一个ThreadLocal
  • 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
@Slf4j @Component public class PostCommitAdapter extends TransactionSynchronizationAdapter { private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>(); // 为提交后执行注册一个新的 runnable public void execute(Runnable runnable) { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<Runnable> runnables = RUNNABLE.get(); if (runnables == null) { runnables = new ArrayList<>(); RUNNABLE.set(runnables); TransactionSynchronizationManager.registerSynchronization(this); } return; } // 如果事务同步未激活 runnable.run(); } @Override public void afterCommit() { List<Runnable> runnables = RUNNABLE.get(); runnables.forEach(Runnable::run); } @Override public void afterCompletion(int status) { RUNNABLE.remove(); } }

如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables。

  • 3、使用 around 建议连接适配器和注释 为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect @Slf4j @AllArgsConstructor @Configuration public class PostCommitAnnotationAspect { private final PostCommitAdapter postCommitAdapter; @Pointcut("@annotation(com...<package>..PostCommit)") private void postCommitPointCut(){} @Around("postCommitPointCut()") public Object aroundAdvice(final ProceedingJoinPoint pjp) { postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp)); return null; } private static final class PjpAfterCommitRunnable implements Runnable { private final ProceedingJoinPoint pjp; public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) { this.pjp = pjp; } @Override public void run() { try { pjp.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } } } }
  • 4、用法 一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。 示例:考虑具有 PostCommit 注释方法的两个类 A 和 B
Class A { @PostCommit void log(){ log.info("log from class A") } } Class B { @PostCommit void log(){ log.info("log from class B") } }

一个驱动类调用这些方法:

Class mainClass { @Transactional void transactionalMethod(Entity entity){ someOperation(entity) log.info("inside transaction"); a.log(); b.log(); save(entity); log.info("end of method"); } }

执行后输出:

> inside transaction > ** saving entity > log from class A > log from Class B

参考: https://blog.csdn.net/weixin_35973945/article/details/115067904

https://blog.csdn.net/qq330983778/article/details/112255441

https://www.cnblogs.com/mangoubiubiu/p/16366241.html

https://www.jdon.com/57371

上一篇:Spring 事务扩展机制 TransactionSynchronization
下一篇:没有了
网友评论