Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单。
Java 内置队列
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 加锁 | arraylist |
LinkedBlockingQueue | 有界 | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | CAS | linkedlist |
LinkedTransferQueue | unbounded | CAS | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全。
基于链表实现的无锁队列 LinkedTransferQueue 是通过CAS 这种不加锁的方式来实现的。(LinkedTransferQueue 性能强)
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。
在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array / heap格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。(针对 linkedlist格式的数据结构,每个element都要另new一个wrapper object,所以object数量要高)
但 ArrayBlockingQueue 在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题。
使用示例
1 | public class Demo { |
Disruptor
LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发,性能和非阻塞算法的研究,如今已成为其Exchange基础结构的核心部分。
高性能的原因
- 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
- 无锁设计:每个生产者或者消费者线程,会通过CAS申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
- 高效的元素位置定位:数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
- 缓存行对齐:解决伪共享问题。
读写流程
写入流程
单个生产者和多个生产者的写入流程不同
单生产者
- 申请写入m个元素
- 若是有m个元素可以入,则返回最大的序列号(主要判断是否会覆盖未读的元素)
- 若是返回的正确,则生产者开始写入元素
多生产者
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。
Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。
Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:Available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
多个生产者写入的时候:
- 申请写入m个元素
- 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间
- 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的
读取流程
- 申请读取到序号n
- 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置
- 消费者读取元素
等待策略
生产者等待策略
暂时只有休眠1ns
1 | LockSupport.parkNanos(1); |
消费者等待策略
默认是:BlockingWaitStrategy
策略 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU资源之间有很好的折中。延迟比较均匀 |
log4j2
日志框架的核心思想是:使用并发队列形式,将日志缓存起来,然后开启单独的线程异步将日志写入本地磁盘。
通常这个队列通常会使用 ArrayBlockingQueue(log4j2和logback都有使用)。
例如 log4j2 中的 AsyncAppender:

Log4j2中的AsyncLogger的内部使用了Disruptor框架:
