一、消息积压的原因
消息积压的直接原因,一定是系统中某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。
二、优化性能来避免消息积压
在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者两部分的业务逻辑中。对于消息队列本身的性能,作为使用者不需要太关注。主要原因是对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。所以对于消息队列的性能优化,更关注的是在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳性能。
1、发送端性能优化
发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。
Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。
至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便就怎么实现。
- 比如说,你的消息发送端是一个微服务,主要接受 RPC 请求处理在线业务。很自然的,微服务在处理每次请求的时候,就在当前线程直接发送消息就可以了,因为所有 RPC 框架都是多线程支持多并发的,自然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响 RPC 服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。
- 如果你的系统是一个离线分析系统,离线系统在性能上的需求是它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。
2、消费端性能优化
使用消息队列时,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题。要么消息队列的存储填满无法提供服务,要么消息丢失。
所以在设计系统时,一定要保证消费端的消费性能要高于生产端的发送性能,这样系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑外,也可以通过水平扩展,增加消费端的并发数来提升总体的消费性能。注意:在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的,原因是对于消费者,在每个分区上实际上只能支持单线程消费。
很多消费程序,他们是这样来解决消费慢的问题的:
它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。
但是,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。