0%

深入理解Disruptor

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单。

Java 内置队列

队列 有界性 数据结构
ArrayBlockingQueue 有界 加锁 arraylist
LinkedBlockingQueue 有界 加锁 linkedlist
ConcurrentLinkedQueue unbounded CAS linkedlist
LinkedTransferQueue unbounded CAS linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全。

基于链表实现的无锁队列 LinkedTransferQueue 是通过CAS 这种不加锁的方式来实现的。(LinkedTransferQueue 性能强)

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。

在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array / heap格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。(针对 linkedlist格式的数据结构,每个element都要另new一个wrapper object,所以object数量要高)

但 ArrayBlockingQueue 在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Demo {

static class LongEvent {
long value;
}

static class LongEventConsumer implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("event = " + event.value);
System.out.println("sequence = " + sequence);
System.out.println("endOfBatch = " + endOfBatch);
}
}

static class LongEventProducer {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer byteBuffer) {
event.value = byteBuffer.getLong(0);
}
};

public void onData(ByteBuffer byteBuffer) {
ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
}
}

public static void main(String[] args) {
// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<LongEvent> factory = LongEvent::new;
// RingBuffer 大小,必须是2的次幂
int bufferSize = 1024;
// 创建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

// 设置消费者 处理Event
disruptor.handleEventsWith(new LongEventConsumer());

// 启动Disruptor,开启所有的线程
disruptor.start();

// 获取RingBuffer用于生产数据
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0; ; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}
}
}

Disruptor

LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发,性能和非阻塞算法的研究,如今已成为其Exchange基础结构的核心部分。

官方文档

高性能的原因

  • 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  • 无锁设计:每个生产者或者消费者线程,会通过CAS申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
  • 高效的元素位置定位:数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
  • 缓存行对齐:解决伪共享问题。

读写流程

写入流程

单个生产者和多个生产者的写入流程不同

单生产者
  1. 申请写入m个元素
  2. 若是有m个元素可以入,则返回最大的序列号(主要判断是否会覆盖未读的元素)
  3. 若是返回的正确,则生产者开始写入元素
多生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。

Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。

Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:Available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

多个生产者写入的时候:

  1. 申请写入m个元素
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的

读取流程

  1. 申请读取到序号n
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置
  3. 消费者读取元素

等待策略

生产者等待策略

暂时只有休眠1ns

1
LockSupport.parkNanos(1);

消费者等待策略

默认是:BlockingWaitStrategy

策略 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

log4j2

日志框架的核心思想是:使用并发队列形式,将日志缓存起来,然后开启单独的线程异步将日志写入本地磁盘。

通常这个队列通常会使用 ArrayBlockingQueue(log4j2和logback都有使用)。

例如 log4j2 中的 AsyncAppender:

Log4j2中的AsyncLogger的内部使用了Disruptor框架:

坚持原创技术分享,您的支持将鼓励我继续创作!