1 背景
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
内存队列 使用场景一般在系统内部,提高在高并发的情况下系统的性能,一般作用于线程间的消息传递
分布式消息队列 使用场景一般在系统和系统间的消息传递,吞吐量高,也适用于消息流数据处理的中间件
2 JAVA内存队列
介绍Disruptor之前,先介绍一下常用线程安全的内置队列。Java的内置队列下表所示:
队列
有界性
锁
数据结构
ArrayBlockingQueue
bounded
加锁
arraylist
LinkedBlockingQueue
optionally-bounded
加锁
linkedlist
ConcurrentLinkedQueue
unbounded
无锁
linkedlist
LinkedTransferQueue
unbounded
无锁
linkedlist
PriorityBlockingQueue
unbounded
加锁
heap
DelayQueue
unbounded
加锁
heap
队列的底层一般分成三种:数组、链表和堆
堆一般情况下是为了实现带有优先级特性的队列,暂不考虑
- 基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全
- 基于链表的线程安全队列分成
- LinkedBlockingQueue 通过锁的方式来实现线程安全
- ConcurrentLinkedQueue 上面表格中的LinkedTransferQueue都是通过原子变量compare and swap这种不加锁的方式来实现
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内) 而加锁的方式,可以实现有界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue
在实际使用过程中,ArrayBlockingQueue会因为加锁和伪共享等出现严重的性能问题
3 Disruptor原理
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
Ring Buffer 如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
Sequence Disruptor 通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。 (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
Sequencer Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier 用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy 定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
Event 在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
EventProcessor EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
Producer 即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
4 代码样例
代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。
以下代码基于3.3.4版本的Disruptor包
/** * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端 */import com.lmax.disruptor.*;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.ThreadFactory;public class DisruptorMain{ public static void main(String[] args) throws Exception { // 队列中的元素 class Element { private int value; public int get(){ return value; } public void set(int value){ this.value= value; } } // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread(r, "simpleThread"); } }; // RingBuffer生产工厂,初始化RingBuffer的时候使用 EventFactory<Element> factory = new EventFactory<Element>() { @Override public Element newInstance() { return new Element(); } }; // 处理Event的handler EventHandler<Element> handler = new EventHandler<Element>(){ @Override public void onEvent(Element element, long sequence, boolean endOfBatch) { System.out.println("Element: " + element.get()); } }; // 阻塞策略 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); // 指定RingBuffer的大小 int bufferSize = 16; // 创建disruptor,采用单生产者模式 Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); // 设置EventHandler disruptor.handleEventsWith(handler); // 启动disruptor的线程 disruptor.start(); RingBuffer<Element> ringBuffer = disruptor.getRingBuffer(); for (int l = 0; true; l++) { // 获取下一个可用位置的下标 long sequence = ringBuffer.next(); try { // 返回可用位置的元素 Element event = ringBuffer.get(sequence); // 设置该位置元素的值 event.set(l); } finally { ringBuffer.publish(sequence); } Thread.sleep(10); } }}
5 应用场景
5.1 Log4j2异步日志打印
log4j2支持日志的异步打印,日志异步输出的好处在于,使用单独的进程来执行日志打印的功能,可以提高日志执行效率,减少日志功能对正常业务的影响。
异步日志在程序的classpath需要加载disruptor-3.0.0.jar或者更高的版本。
5.2 海量job处理
现在有8个库1024张表,大量的job需要处理,每时每刻任务都在海量增加
启动8台机器,每台机器扫描一个库的待执行job,共128个表需要扫描,这里可以启动128个线程去并发扫描,每查出来一次,立马通过disruptor发布出去,另外一端监听到发布的任务之后调用任务处理接口进行处理,就算有任务执行异常,也不会阻塞其它的任务,可以边发布边处理,最大程度提升任务处理能力。
一直积压的任务有旁路报警机制,每次执行失败的job执行次数+1,当大于指定阈值则报警。
一旦无法放入disruptor就会报警,表明队列已满,处理不过来了,得扩容下游处理任务的机器
disruptor的消费末端通过线程池严格控制消费能力,不会出现任务生产过快消费不过来的情况
如果有多种不同类型的任务要处理,可以初始化多个不同size的ringbuffer去处理,定义不同的evenHandler
局限性应该是它是个内存队列,处理不了分布式场景的