当前位置 : 主页 > 编程语言 > java >

分布式事务(二):基于可靠消息的分布式事务

来源:互联网 收集:自由互联 发布时间:2023-02-04
项目使用技术 springboot、dubbo、zookeeper、定时任务、消息中间件MQ 一、项目结构 maven父子工程: 父工程:consis 子工程:api-service、order、product、message api-service:该项目主要是提供接口调用

项目使用技术

springboot、dubbo、zookeeper、定时任务、消息中间件MQ

一、项目结构

maven父子工程:

父工程:consis

子工程:api-service、order、product、message

api-service:该项目主要是提供接口调用的,还包含实体类、枚举等一些通用内容

order:该项目是专门处理订单相关操作的系统

product:该项目是专门处理产品相关操作的系统

message:该项目是提供消息服务的系统,好包括定时任务

它们的依赖关系如下图:

enter image description here

根据上一篇的原理分别介绍下各个系统的实现

二、order订单系统

核心代码:

@Override @Transactional public void add(Orders order) { String messageBody = JSONObject.toJSONString( order ); //添加消息到数据库 String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME ); log.info(">>> 预发送消息,消息编号:{}", messageId); boolean flag = false; boolean success = false; try{ Orders orders = orderDao.saveAndFlush( order ); //int i = 1/0 ; log.info(">>> 插入订单,订单编号:{}", orders.getId()); flag = true; }catch (Exception e){ transactionMessageService.delete( messageId ); log.info(">>> 业务执行异常删除消息,消息编号:{}", messageId, e); throw new RuntimeException( ">>> 创建订单失败" ); }finally { if(flag){ try { transactionMessageService.confirmAndSend( messageId ); success = true; log.info(">>> 确认并且发送消息到实时消息中间件,消息编号:{}", messageId); }catch (Exception e){ log.error(">>> 消息确认异常,消息编号:{}", messageId, e); if(!success){ transactionMessageService.delete( messageId ); throw new RuntimeException( ">>> 确认消息异常,创建订单失败" ); } } } } }
  • 插入订单表之前,首先创建预发送消息,保存到事务消息表中,此时消息状态为:未发送
  • 插入订单,如果插入订单失败则将事务消息表中预发送消息删除
  • 插入订单成功后,修改消息表预发送消息状态为发送中,并发送消息至mq
  • 如果发送消息失败,则订单回滚并删除事务消息表消息

三、message消息系统

核心代码一:

@Override public void sendMessageToMessageQueue(String queueName,final String messageBody) { jmsTemplate.convertAndSend( queueName,messageBody ); log.info(">>> 发送消息到mq 队列:{},消息内容:{}", queueName, messageBody); }
  • 主要是activemq生产者讲消息发送至MQ消息中间件

核心代码二:

/** * 定时重发消息(每分钟) */ @Scheduled(cron = "0 */1 * * * ?") public void handler(){ //查询transaction_message表中已发送但未被删除的消息 List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING ); if(list!=null && list.size() > 0){ for (TransactionMessage message:list){ try { transactionMessageService.retry( message.getMessageId() ); } catch (Exception e) { log.warn(">>> 消息不存在,可能已经被消费,消息编号:{}", message.getMessageId()); } } } } /** * 定时通知工作人员(每隔5分钟) */ @Scheduled(cron = "0 */5 * * * ?") public void advance(){ List<Long> messages = transactionMessageService.queryDeadList(); log.warn(">>> 共有:{}条消息需要人工处理", messages.size()); String ids = JSONObject.toJSONString( messages ); //发邮件或者是发送短信通知工作人员处理 }
  • 定时重发消息
  • 定时将死亡的消息通知给工作人员,进行人工补偿操作

四、product产品系统

核心代码:

@Transactional @JmsListener( destination = Constant.ORDER_QUEUE_NAME) public void receiveQueue(String msg){ boolean flag = false; Orders orders = JSONObject.parseObject( msg, Orders.class ); log.info(">>> 接收到mq消息队列,消息编号:{} ,消息内容:{}", orders.getMessageId(), msg); TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() ); try { //保证幂等性 if(transactionMessage!=null){ List<OrderDetail> list = orders.getList(); for(OrderDetail detail : list){ Product product = productService.findById( detail.getId() ); Long skuNum = product.getProductSku() - detail.getNum(); if(skuNum >= 0){ product.setProductSku( skuNum ); productService.update( product ); }else { throw new Exception( ">>> 库存不足,修改库存失败!" ); } } //int i = 1 /0 ; flag = true; } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException( e ); }finally { if(flag){ transactionMessageService.delete( orders.getMessageId() ); DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() ); if(dbLog!=null){ dbLog.setState( "1" );//已处理成功 dbLogService.update( dbLog ); } log.info(">>> 业务执行成功删除消息! messageId:{}", orders.getMessageId()); } } }
  • 从mq消息中间件中监听并消费消息,将json消息转为订单对象
  • 根据消息编号查询该消息是否已被消费,保证幂等性
  • 如果消息未被消费(即存在此消息),则产品表扣减库存;如果已经消费(不存在此消息),则不做处理
  • 产品表扣减库存成功,则删除此消息,如果待处理消息日志表中有此消息,则更改状态为1,表示已处理;扣减失败,则不做处理

该项目源码已上传至github和码云,链接如下,希望喜欢的朋友都能给个star支持一下!谢谢~

github链接: https://github.com/wanglinyong/consis

码云链接: https://gitee.com/wanglinyong/consis

原文出处:

分布式事务(二):基于可靠消息的分布式事务

上一篇:Java 如何发送http get post请求5分钟解决
下一篇:没有了
网友评论