项目使用技术 springboot、dubbo、zookeeper、定时任务、消息中间件MQ 一、项目结构 maven父子工程: 父工程:consis 子工程:api-service、order、product、message api-service:该项目主要是提供接口调用
项目使用技术
springboot、dubbo、zookeeper、定时任务、消息中间件MQ
一、项目结构
maven父子工程:
父工程:consis
子工程:api-service、order、product、message
api-service:该项目主要是提供接口调用的,还包含实体类、枚举等一些通用内容
order:该项目是专门处理订单相关操作的系统
product:该项目是专门处理产品相关操作的系统
message:该项目是提供消息服务的系统,好包括定时任务
它们的依赖关系如下图:
根据上一篇的原理分别介绍下各个系统的实现
二、order订单系统
核心代码:
@Override @Transactional public void add(Orders order) { String messageBody = JSONObject.toJSONString( order ); //添加消息到数据库 String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME ); log.info(">>> 预发送消息,消息编号:{}", messageId); boolean flag = false; boolean success = false; try{ Orders orders = orderDao.saveAndFlush( order ); //int i = 1/0 ; log.info(">>> 插入订单,订单编号:{}", orders.getId()); flag = true; }catch (Exception e){ transactionMessageService.delete( messageId ); log.info(">>> 业务执行异常删除消息,消息编号:{}", messageId, e); throw new RuntimeException( ">>> 创建订单失败" ); }finally { if(flag){ try { transactionMessageService.confirmAndSend( messageId ); success = true; log.info(">>> 确认并且发送消息到实时消息中间件,消息编号:{}", messageId); }catch (Exception e){ log.error(">>> 消息确认异常,消息编号:{}", messageId, e); if(!success){ transactionMessageService.delete( messageId ); throw new RuntimeException( ">>> 确认消息异常,创建订单失败" ); } } } } }- 插入订单表之前,首先创建预发送消息,保存到事务消息表中,此时消息状态为:未发送
- 插入订单,如果插入订单失败则将事务消息表中预发送消息删除
- 插入订单成功后,修改消息表预发送消息状态为发送中,并发送消息至mq
- 如果发送消息失败,则订单回滚并删除事务消息表消息
三、message消息系统
核心代码一:
@Override public void sendMessageToMessageQueue(String queueName,final String messageBody) { jmsTemplate.convertAndSend( queueName,messageBody ); log.info(">>> 发送消息到mq 队列:{},消息内容:{}", queueName, messageBody); }- 主要是activemq生产者讲消息发送至MQ消息中间件
核心代码二:
/** * 定时重发消息(每分钟) */ @Scheduled(cron = "0 */1 * * * ?") public void handler(){ //查询transaction_message表中已发送但未被删除的消息 List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING ); if(list!=null && list.size() > 0){ for (TransactionMessage message:list){ try { transactionMessageService.retry( message.getMessageId() ); } catch (Exception e) { log.warn(">>> 消息不存在,可能已经被消费,消息编号:{}", message.getMessageId()); } } } } /** * 定时通知工作人员(每隔5分钟) */ @Scheduled(cron = "0 */5 * * * ?") public void advance(){ List<Long> messages = transactionMessageService.queryDeadList(); log.warn(">>> 共有:{}条消息需要人工处理", messages.size()); String ids = JSONObject.toJSONString( messages ); //发邮件或者是发送短信通知工作人员处理 }- 定时重发消息
- 定时将死亡的消息通知给工作人员,进行人工补偿操作
四、product产品系统
核心代码:
@Transactional @JmsListener( destination = Constant.ORDER_QUEUE_NAME) public void receiveQueue(String msg){ boolean flag = false; Orders orders = JSONObject.parseObject( msg, Orders.class ); log.info(">>> 接收到mq消息队列,消息编号:{} ,消息内容:{}", orders.getMessageId(), msg); TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() ); try { //保证幂等性 if(transactionMessage!=null){ List<OrderDetail> list = orders.getList(); for(OrderDetail detail : list){ Product product = productService.findById( detail.getId() ); Long skuNum = product.getProductSku() - detail.getNum(); if(skuNum >= 0){ product.setProductSku( skuNum ); productService.update( product ); }else { throw new Exception( ">>> 库存不足,修改库存失败!" ); } } //int i = 1 /0 ; flag = true; } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException( e ); }finally { if(flag){ transactionMessageService.delete( orders.getMessageId() ); DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() ); if(dbLog!=null){ dbLog.setState( "1" );//已处理成功 dbLogService.update( dbLog ); } log.info(">>> 业务执行成功删除消息! messageId:{}", orders.getMessageId()); } } }- 从mq消息中间件中监听并消费消息,将json消息转为订单对象
- 根据消息编号查询该消息是否已被消费,保证幂等性
- 如果消息未被消费(即存在此消息),则产品表扣减库存;如果已经消费(不存在此消息),则不做处理
- 产品表扣减库存成功,则删除此消息,如果待处理消息日志表中有此消息,则更改状态为1,表示已处理;扣减失败,则不做处理
该项目源码已上传至github和码云,链接如下,希望喜欢的朋友都能给个star支持一下!谢谢~
github链接: https://github.com/wanglinyong/consis
码云链接: https://gitee.com/wanglinyong/consis