本文摘录总结于极客时间——《Java业务开发常见错误 100 例》
异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。区别于另外两种,异步任务一般用于:
- 区别于主流程,像是用户注册后的发放优惠券、以及短信的发送等时效性不那么强,可以进行异步处理。
- 用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。
不过异步任务虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。今天,就用三个代码案例结合目前常用的 MQ 系统 RabbitMQ,来具体聊聊。
会使用 Spring AMQP 来操作 RabbitMQ,所以你需要先引入 amqp 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
异步处理需要消息补偿闭环
使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可以落地到磁盘保存,及时 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、传输、处理等环节都可以能发生消息丢失。此外,任何 MQ 中间件都无法确保百分百可用,需要考虑异步流程不可用的场景。
因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程。
我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程,会员服务收到消息后发送欢迎消息的流程为异步流程。
我们来分析一下:
- 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线代表异步调用);
- 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消息;
- 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水平。
来看一下实现代码,对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为百分之五十:
@RestController
@Slf4j
@RequestMapping("user")
public class UserController {
@Autowired
private UserService userService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("register")
public void register() {
//模拟10个用户注册
IntStream.rangeClosed(1, 10).forEach(i -> {
//落库
User user = userService.register();
//模拟50%的消息可能发送失败
if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
//通过RabbitMQ发送消息
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
log.info("sent mq user {}", user.getId());
}
});
}
}
然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,
避免相同的用户进行补偿时重复发送短信:
@Component
@Slf4j
public class MemberService {
//发送欢迎消息的状态
private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
//监听用户注册成功的消息,发送欢迎消息
@RabbitListener(queues = RabbitConfiguration.QUEUE)
public void listen(User user) {
log.info("receive mq user {}", user.getId());
welcome(user);
}
//发送欢迎消息
public void welcome(User user) {
//去重操作
if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
log.info("memberService: welcome new user {}", user.getId());
}
}
}
对于 MQ 消费程序,处理逻辑无比考虑去重(支持幂等),原因有几个:
- MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。
- 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且考虑到高内聚,补偿 Job 本身不会做去重处理。
- 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故,MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大量资金重复发放。
接下来定义 Job 补偿备线操作。我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。
@Component
@Slf4j
public class CompensationJob {
//补偿Job异步处理线程池
private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
10, 10,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
@Autowired
private UserService userService;
@Autowired
private MemberService memberService;
//目前补偿到哪个用户ID
private long offset = 0;
//10秒后开始补偿,5秒补偿一次
@Scheduled(initialDelay = 10_000, fixedRate = 5_000)
public void compensationJob() {
log.info("开始从用户ID {} 补偿", offset);
//获取从offset开始的用户
userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
compensationThreadPool.execute(() -> memberService.welcome(user));
offset = user.getId();
});
}
}
为了实现高内聚,主线和北线处理消息最好都使用同一方法。
比如,本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方法。
另外一说, Demo 中的补偿逻辑都是比较简单的,生产级别的代码都应该在以下几个方面进行加强:
- 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量。
- 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前(数据库查询)的用户再进行补偿,以方便和主线 MQ 实时流程错开,避免冲突。
- 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
- 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。
运行程序,执行注册方法注册 10 个用户,输出如下:
[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28 ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28 ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28 ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28 ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18 ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18 ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18 ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18 ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29 ] - 开始从用户ID 0 补偿
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29 ] - 开始从用户ID 5 补偿
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29 ] - 开始从用户ID 9 补偿
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28 ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29 ] - 开始从用户ID 10 补偿
可以看到:
- 总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。
- 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行补充了用户 10。
最后提一下,针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。
注意消息模式是广播还是消息队列先来了解一下两者的区别:
- 消息广播,和我们平时说的广播大差不差,就是希望同一条消息,不同消费者都能分别消费。
- 而队列模式,就是不同消费者共享消费同一个队列的数据,相同消费只能被某一个消费者消费一次。
&emspp;就比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式):
在这里实现代码时,不同的 MQ 实现逻辑不一致,对于 RocketMQ 类似的 MQ 来说,实现类似功能比较简单直白:如果消费者属于一个组,那么消息只会由同一个组的一个消费者来消费;如果消费者属于不同组,那么每个组都能消费一遍消息。
那么,还是推荐直接使用 RocketMQ 来的简单实惠。
实际上线程池如果配置了无界队列,那么有可能会发生 OOM 事件;使用消息队列处理异步流程的时候,我们也同样要注意消息队列的任务堆积问题。对于突发流量引起的消息队列堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。
比如,用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因为用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在 MQ 中像幽灵一样回荡的同一条消息,就是死信。
随着 MQ 被越来越多的死信填满,消费者需要花费大量时间反复处理死信,导致正常消息的消费受阻,最终 MQ 可能因为数据量过大而崩溃。
我们来测试一下这个场景:
@Bean
public Declarables declarables() {
//队列
Queue queue = new Queue(Consts.QUEUE);
//交换器
DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
//快速声明一组对象,包含队列、交换器,以及队列到交换器的绑定
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
}
然后,实现一个 sendMessage 方法来发送消息到 MQ,访问一次提交一条消息,使用自增标识作为消息内容:
//自增消息标识
AtomicLong atomicLong = new AtomicLong();
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage")
public void sendMessage() {
String msg = "msg" + atomicLong.incrementAndGet();
log.info("send message {}", msg);
//发送消息
rabbitTemplate.convertAndSend(Consts.EXCHANGE, msg);
}
收到消息后,直接抛出空指针异常,模拟处理出错的情况:
@RabbitListener(queues = Consts.QUEUE)
public void handler(String data) {
log.info("got message {}", data);
throw new NullPointerException("error");
}
调用 sendMessage 接口发送两条消息,然后来到 RabbitMQ 管理台,可以看到这两条消息始终在队列中,不断被重新投递,导致重新投递 QPS 达到了 1063。
同时,在日志中可以看到大量异常信息:
[20:02:31.533] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [WARN ] [o.s.a.r.l.ConditionalRejectingErrorHandler:129 ] - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(java.lang.String)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:219)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:143)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:132)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1569)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1488)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1476)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1467)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1411)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: error
at org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(MQListener.java:14)
at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:211)
... 13 common frames omitted
解决死信无限重复进入队列最简单的方式是,在程序处理出错的时候,直接抛出 AmqpRejectAndDontRequeueException 异常,避免消息重新进入队列:
throw new AmqpRejectAndDontRequeueException("error");
但,我们更希望的逻辑是,对于同一条消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。对于来自死信队列的数据,我们可能只是记录日志发送报警,即使出现异常也不会再重复投递。整个逻辑如下图所示:
或许文章名称可以换成消息队列:XXX....