0%

Java集合-Queue/Deque篇

JDK提供的集合类型主要分为四种类型:

  1. List:支持重复元素
  2. Set:不支持重复元素
  3. Map:键/值对的映射集
  4. Queue/Deque(double ended queue):queue是在集合尾部添加元素,在头部删除元素的队列,deque是可在头部和尾部添加或者删除元素的双端队列,deque既可以实现队列又可以实现栈

本文基于JDK8,java version “1.8.0_251”

ArrayDeque

基于数组,循环数组,非线程安全,效率高,双端队列,即可实现队列也可实现栈

  1. 基于数组,最小容量为8,默认容量为16,最大容量Integer.MAX_VALUE-8(2^31 - 8),容量总是为2的次幂(用于判断是否需要扩容)。

    1
    2
    3
    4
    5
    transient Object[] elements; // non-private to simplify nested class access
    private static final int MIN_INITIAL_CAPACITY = 8;
    public ArrayDeque() {
    elements = new Object[16];
    }
  2. 即可实现队列也可实现栈,ArrayDeque 和 LinkedList 都是 Deque 接口的实现类,都具备既可以作为队列,又可以作为栈来使用的特性,两者主要区别在于底层数据结构的不同。ArrayDeque 底层数据结构是以循环数组为基础,而 LinkedList 底层数据结构是以循环链表为基础。理论上,链表在添加、删除方面性能高于数组结构,在查询方面数组结构性能高于链表结构,但是对于数组结构,如果不进行数组移动,在添加方面效率也很高。LinkedList的内存开销更大。对于小数据量,ArrayDeque 和 LinkedList 在效率方面相差不大,但是对于大数据量,推荐使用 ArrayDeque

  3. 循环数组,为了满足可以同时在数组两端插入或删除元素的需求,该数组还必须是循环的,即循环数组,也就是说数组的任何一点都可能被看作起点或者终点,head和tail分别代表起点和终点的索引值。因为是循环数组,所以 head 不一定总是指向下标为 0 的数组元素,tail 也不一定总是比 head 大

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * The index of the element at the head of the deque (which is the
    * element that would be removed by remove() or pop()); or an
    * arbitrary number equal to tail if the deque is empty.
    */
    transient int head;

    /**
    * The index at which the next element would be added to the tail
    * of the deque (via addLast(E), add(E), or push(E)).
    */
    transient int tail;
  4. 动态扩容,容量为原来的2倍。(tail = (tail + 1) & (elements.length - 1)) == head用与运算来计算终点位置,然后判断tail是否和head重合,如果重合就触发扩容。扩容申请一个2倍大小的数组,将原数组复制到新数组。复制数组分两次进行,第一次复制 head 头部索引至数组末端的元素到新数组,第二次复制 head 左边的元素到新数组

    1
    2
    3
    4
    5
    6
    7
    public void addLast(E e) {
    if (e == null)
    throw new NullPointerException();
    elements[tail] = e;
    if ( (tail = (tail + 1) & (elements.length - 1)) == head)
    doubleCapacity();
    }
  5. ArrayDeque不允许插入null,而 LinkedList 允许插入null

  6. 非线程安全,也没办法通过Collections类变为线程安全的类。

  7. 支持fail-fast机制,如果tail != fence || result == null,则抛出ConcurrentModificationException异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public E next() {
    if (cursor == fence)
    throw new NoSuchElementException();
    @SuppressWarnings("unchecked")
    E result = (E) elements[cursor];
    // This check doesn't catch all possible comodifications,
    // but does catch the ones that corrupt traversal
    if (tail != fence || result == null)
    throw new ConcurrentModificationException();
    lastRet = cursor;
    cursor = (cursor + 1) & (elements.length - 1);
    return result;
    }

PriorityQueue

优先队列,小顶堆,非线程安全

  1. 基于数组实现的小顶堆,根据元素的CompareTo方法顺序或自定义的comparator比较顺序。

  2. 父节点和子节点的编号是有联系的。

    1
    2
    3
    leftNo = parentNo*2+1
    rightNo = parentNo*2+2
    parentNo = (nodeNo-1)/2
  3. 添加元素。新加入的元素可能会破坏小顶堆的性质,因此需要进行必要的调整。

  4. 删除类似和添加元素类似。

  5. 动态扩容,如果原数组容量小于64,则扩容为原数组容量的2倍+2,否则扩容为原数组容量的3/2倍。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private void grow(int minCapacity) { // minCapacity等于原数组容量+1
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
    (oldCapacity + 2) :
    (oldCapacity >> 1));
    // overflow-conscious code
    if (newCapacity - MAX_ARRAY_SIZE > 0)
    newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
    }

ConcurrentLinkedQueue

线程安全的非阻塞队列,基于链表的无界线程安全队列,无锁

  1. 基于链表,不能指定最大容量

    1
    2
    3
    private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
  2. head节点表示队头,tail节点表示队尾,head节点用来出队列使用,一个tail节点用来入队列使用,初始化的时候head = tail 都是一个空节点。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private transient volatile Node<E> head;

    /**
    * A node from which the last node on list (that is, the unique
    * node with node.next == null) can be reached in O(1) time.
    * Invariants:
    * - the last node is always reachable from tail via succ()
    * - tail != null
    * Non-invariants:
    * - tail.item may or may not be null.
    * - it is permitted for tail to lag behind head, that is, for tail
    * to not be reachable from head!
    * - tail.next may or may not be self-pointing to tail.
    */
    private transient volatile Node<E> tail;

    public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
    }
  3. head 未必是队列中第一个元素(head指向的可能是一个已经被移除的元素),tail 未必是队列中最后一个元素(tail.next 可以不为 null)

    有了 head 和 tail 节点,如果按照我们平常的思维,head 节点即头节点,tail 节点即尾节点。那么入队列的时候,将 tail 的 next 节点设置为 newNode,将 newNode 设置为 tail;出队列的时候,将 head 节点元素返回,head 的 next 节点设置为 head。实现代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public boolean offer(E e) {
    if (e == null)
    throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    for (; ; ) {
    Node<E> t = tail;
    if (t.casNext(null, n) && casTail(t, n)) {
    return true;
    }
    }
    }

    这样的做法 tail 节点永远作为队列的尾节点,head 节点永远为队列的头节点。实现代码量非常少,而且逻辑清晰和易懂。但是,这么做有个缺点,每次都需要使用循环 CAS 更新 tail 节点。所以 doug lea 为了减少 CAS 更新 tail 节点的次数,提高入队的效率,使用增加循环来控制 tail 节点的更新频率,并不是每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节点和尾节点不一致时(也就是循环两次)才更新 tail 节点。

  4. size方法可能不准确,(实际上juc集合里面的size方法所返回的元素个数都是不保证准确的)

参考资料:

  1. ConcurrentLinkedQueue 源码解读
  2. JUC源码分析-集合篇(四):ConcurrentLinkedQueue

ConcurrentLinkedDeque

基于双向链表结构的无界并发队列,CAS,与 ConcurrentLinkedQueue 的区别是该阻塞队列同时支持FIFOFILO两种操作方式

  1. 基于双向链表结构的无界并发队列

参考资料:

  1. JUC源码分析-集合篇(五):ConcurrentLinkedDeque

BlockingQueue

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue:一个由链表结构组成的有界(但大小默认值为Interger.MAX_VALUE)阻塞队列。
  3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  5. SynchronousQueue:一个不存储元素的阻塞队列,也即单个元素的阻塞队列。
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  7. LinkedBlockingDeque:一个由链表结构组成的双向有界阻塞队列。

ArrayBlockingQueue

有界阻塞队列,基于循环数组,线程安全

  1. 基于循环数组,关键成员变量如下:

    1. lock的作用:在多线程下操作的,所有修改items、takeIndex、putIndex和count这些成员变量时,必须要考虑多线程安全问题,这里使用lock独占锁,来保证并发操作的安全
    2. notEmpty与notFull的作用:因为阻塞队列必须实现,当队列为空或队列已满的时候,队列的读取或插入操作要等待。当队列从空时,插入元素需要唤醒之前因为读取等待的线程。当队列已满时,移出元素需要唤醒之前因为插入等待的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /** 储存队列的中元素 */
    final Object[] items;

    /** 队列头的位置 */
    int takeIndex;

    /** 队列尾的位置 */
    int putIndex;

    /** 当前队列拥有的元素个数 */
    int count;

    /** 用来保证多线程操作共享变量的安全问题 */
    final ReentrantLock lock;

    /** 当队列为空时,就会调用notEmpty的wait方法,让当前线程等待 */
    private final Condition notEmpty;

    /** 当队列为满时,就会调用notFull的wait方法,让当前线程等待 */
    private final Condition notFull;
  2. 有界队列,构造方法需要指定队列大小,并且可以指定lock使用公平锁还是非公平锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }
  3. 不允许插入null

  4. 循环数组,和ArrayDeque同理。

  5. 线程安全

  6. 不支持fail-fast机制

源码说明:

lock.lock()是保证同一时间只有一个线程修改成员变量,防止出现并发操作问题。虽然它也会阻塞当前线程,但是它并不是条件等待,只是因为锁被其他线程持有,而ArrayBlockingQueue中方法操作时间都不长,这里相当于不阻塞线程。

add(E e)与offer(E e)方法

向队尾新添加元素,如果队列已满,返回false,否则添加成功返回true。

add方法调用父类的方法,父类的方法会调用offer方法,最终add方法实际就是调用offer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

enqueue(E x)方法

向队尾新添加元素,向队列末尾新添加元素,添加成功则将putIndex+1,如果达到数组长度,putIndex=0(因为循环数组),然后唤醒因为数组为空时获取元素阻塞的线程。

1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

疑问:调用notEmpty.signal()之前是不是应该判断一下当前的长度是否为1?

put方法

向队尾新添加元素。向队列末尾新添加元素,如果队列已满,则调用notFull.await(),这里判断需要用while而不能用if,目的是防止虚假唤醒。线程被唤醒之后,检查是否已满,如果队列还是满的,则继续等待,如果不是满的,则向队尾添加元素。

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

offer(E e, long timeout, TimeUnit unit)方法

向队尾新添加元素,可设置最大的阻塞时间。

如果队列中没有可用空间,当前线程就等待, 如果等待时间超过timeout了,那么返回false,表示添加失败。如果被唤醒,则检查是否有可用空间,如果有可用空间,则继续添加元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

remove()和poll()方法

删除队头元素,如果队列为空,返回null,不为空删除成功返回元素。

remove方法调用父类的方法,父类的方法会调用poll方法,最终remove方法实际就是调用poll方法。

1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

dequeue方法

删除队头元素,先将队头元素删除,然后将takeIndex的值+1,如果达到数组长度,takeIndex=0(因为循环数组),然后唤醒因为数组已满时添加元素阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

疑问:调用notFull.signal();之前是否应该先判断容量为数组长度-1?

take()方法

删除队头元素,如果当前容量为0,则等待。否则删除元素。这里判断当前容量不能用if,目的是防止虚假唤醒。如果被唤醒,则检查队里是否有元素,如果有元素,则继续删除元素。

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

poll(long timeout, TimeUnit unit)方法

删除队头元素,可设置最大的阻塞时间。

如果当前容量为0,则等待,等待超过最大等待时间则返回null。如果被唤醒,则检查队里是否有元素,如果有元,则继续删除元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

element()与peek() 方法

查看元素

element方法调用父类的方法,父类的方法会调用peek方法,最终element方法实际就是调用peek方法。

1
2
3
4
5
6
7
8
9
10
11
12
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}

remove(Object o)方法

删除指定元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。这里有两点需要注意:

  1. 如何遍历队列,就是从队列头遍历到队列尾。就要靠takeIndex和putIndex两个变量了。
  2. 为什么Object[] items = this.items;这句代码没有放到同步锁lock代码块内。items是成员变量,那么多线程操作的时候,不会有并发问题么?
    这个是因为items是个引用变量,不是基本数据类型,而且我们对队列的插入和删除操作,都是针对这一个items数组,没有改变数组的引用,所以在lock代码中,items会得到其他线程对它最新的修改。但是如果这里将int putIndex = this.putIndex;方法lock代码块外面,就会产生问题。

removeAt(final int removeIndex)方法

删除指定位置的元素,需要注意的是删除之后的数组还能保持队列形式,分为两种情况:

  1. 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置+1
  2. 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeIndex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}

疑问:为什么构造方法要加锁?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

注释中有写:锁的用处在于可见性而不是竞争。此处使用锁的意义在于避免指令重排序,避免出现对象未初始化完,就将将内存空间的地址赋值给对应的引用。保证count,putIndex,items[i]的可见性。

注意item被final修饰,只能保证final字段对应的引用是up-to-date的。

LinkedBlockingQueue

有界阻塞队列,基于链表,线程安全,双锁

  1. 基于链表,关键成员变量如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
    * Linked list node class
    */
    static class Node<E> {
    E item;
    /**
    * One of:
    * - the real successor Node
    * - this Node, meaning the successor is head.next
    * - null, meaning there is no successor (this is the last node)
    */
    Node<E> next;
    Node(E x) { item = x; }
    }
    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    /**
    * Head of linked list.
    * Invariant: head.item == null
    */
    transient Node<E> head;
    /**
    * Tail of linked list.
    * Invariant: last.next == null
    */
    private transient Node<E> last;
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    双锁。一个take锁,控制消费者并发,一个put锁,控制生产者并发

  2. 有界队列,默认和最大长度为 Integer.MAX_VALUE,并且可以指定lock使用公平锁还是非公平锁

  3. 不允许插入null

  4. 线程安全

  5. 不支持fail-fast机制

add(E e)与offer(E e)方法

向队尾新添加元素,如果队列已满,返回false。如果队列没有满,添加元素,添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。

add方法调用父类的方法,父类的方法会调用offer方法,最终add方法实际就是调用offer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

put方法

向队尾新添加元素。向队列末尾新添加元素,如果队列已满,则调用notFull.await(),这里判断需要用while而不能用if,目的是防止虚假唤醒。线程被唤醒之后,检查是否已满,如果队列还是满的,则继续等待,如果不是满的,则向队尾添加元素。添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

offer(E e, long timeout, TimeUnit unit)方法

向队尾新添加元素,可设置最大的阻塞时间。

如果队列中没有可用空间,当前线程就等待, 如果等待时间超过timeout了,那么返回false,表示添加失败。如果被唤醒,则检查队列是否已满,如果没满,则继续添加元素,添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

remove()和poll()方法

删除队头元素,如果队列为空,返回null,不为空就删除元素,删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。

remove方法调用父类的方法,父类的方法会调用poll方法,最终remove方法实际就是调用poll方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

take()方法

删除队头元素,如果当前容量为0,则等待。否则删除元素。这里判断当前容量不能用if,目的是防止虚假唤醒。如果被唤醒,则检查队里是否有元素,如果有元素,则继续删除元素。删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

poll(long timeout, TimeUnit unit)方法

删除队头元素,可设置最大的阻塞时间。

如果当前容量为0,则等待,等待超过最大等待时间则返回null。如果被唤醒,则检查队里是否有元素,如果有元,则继续删除元素。删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

element()与peek() 方法

查看元素,需要放置元素被删除,增加不影响

element方法调用父类的方法,父类的方法会调用peek方法,最终element方法实际就是调用peek方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

remove(Object o)方法

删除指定元素,删除和添加都需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

疑问:为什么ArrayBlockingQueue只用单锁?

目前没有找到确切的答案,但可以确定的是,ArrayBlockingQueue可以使用双锁实现,并且有明显的性能提升。

参考资料:

  1. ABQ双锁实现代码
  2. ABQ为什么只用单锁,以及双锁改造和性能测试

PriorityBlockingQueue

无界阻塞队列,基于数组实现的小顶堆,线程安全,CAS

  1. 基于数组实现的小顶堆,根据元素的CompareTo方法顺序或自定义的comparator比较顺序。关键成员变量如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 默认初始化容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    // 阻塞队列容量最大值
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 阻塞队列
    private transient Object[] queue;
    // 阻塞队列中元素数量
    private transient int size;
    // 比较器,元素没有实现comparable接口时,需提供比较器
    private transient Comparator<? super E> comparator;
    // 独占锁,读写线程共用这一把锁
    private final ReentrantLock lock;
    // 读线程等待队列,写线程永远不会阻塞
    private final Condition notEmpty;
    // 写线程扩容锁,通过CAS控制,只有一个写线程会将此变量从0变成1
    private transient volatile int allocationSpinLock;
  2. 无界阻塞队列,可以声明初始容量,但是会超过初始容量会自动扩容,最大容量为Integer.MAX_VALUE-8。

  3. 写线程不阻塞,读线程在队列为空时阻塞

    当队列为满时,写线程不会阻塞,而会尝试去扩容,扩容成功就继续向阻塞队列写入数据。当队列为空时,读线程会阻塞等待,直到队列不为空,被写线程唤醒。因此该阻塞队列适用于读多于写的场景,不然,写线程过多,会导致内存消耗过大,影响性能。读写线程共用同一把独占锁。

  4. 不允许插入null

  5. 线程安全

  6. 不支持fail-fast机制

  7. 动态扩容,如果原数组容量小于64,则扩容为原数组容量的2倍+2,否则扩容为原数组容量的3/2倍。

阻塞队列PriorityBlockingQueue从不阻塞写线程,当队列满时,写线程会尝试扩容阻塞队列,扩容成功后再向阻塞队列中新增元素,而当队列元素为空时,会阻塞读线程的读取,当然也有非阻塞的方法(poll)。该阻塞队列适用于读多于写的场景,不然,写线程过多,会导致内存消耗过大,影响性能。阻塞队列采用堆存储结构,因此每次冲阻塞队列取出的元素总是最小元素(或最大元素)。而堆存储需要提供比较器或者元素实现了阻塞接口,否则程序会抛出ClassCastException。

DelayQueue

延迟队列,基于优先队列,线程安全,无界

  1. DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。本质上即:DelayQueue = BlockingQueue +PriorityQueue + Delayed。

    1
    2
    3
    4
    5
    6
    public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
    }
    public class DelayQueue<E extends Delayed> implements BlockingQueue<E> {
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    }

    队列中只能存放实现Delayed接口的对象,而此接口有两个需要实现的方法。最重要的就是getDelay,这个方法需要返回对象过期前的时间。简单说,队列在某些方法处理前,会调用此方法来判断对象有没有超时。

  2. 关键成员变量如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 阻塞等待使用了可重入锁,只有一把
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先队列,用来对不同延迟任务的排序
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 这个Leader 有意思,解决了队列头的数据和线程的关联
    // 同时解决了其他线程由谁唤醒
    private Thread leader = null;
    // 与Leader Thread配合 唤醒等待的Leader或者新Leader替换
    private final Condition available = lock.newCondition();
  3. 重入锁是非公平的,且不支持设置。

  4. 不允许插入null

  5. 线程安全

  6. 不支持fail-fast机制

  7. 动态扩容,依赖PriorityQueue动态扩容。

take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

可以看出延迟的实现原理就是用到了 Condition.awaitNanos(delay) 方法。先 peek 看看有没有元素,再看看元素有没有过期,过期就 poll 取出,还没过期就是 await 等待。
这里有两点需要注意:

  1. leader线程的作用,下面是官方的说明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * Thread designated to wait for the element at the head of
    * the queue. This variant of the Leader-Follower pattern
    * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
    * minimize unnecessary timed waiting. When a thread becomes
    * the leader, it waits only for the next delay to elapse, but
    * other threads await indefinitely. The leader thread must
    * signal some other thread before returning from take() or
    * poll(...), unless some other thread becomes leader in the
    * interim. Whenever the head of the queue is replaced with
    * an element with an earlier expiration time, the leader
    * field is invalidated by being reset to null, and some
    * waiting thread, but not necessarily the current leader, is
    * signalled. So waiting threads must be prepared to acquire
    * and lose leadership while waiting.
    */
    private Thread leader = null;

    说了是用到 Leader-Follower 模式。如果一个线程是 leader 线程,那么它只会等待available.awaitNanos(delay) 这么多时间,其他后来的 follower 线程只能干等。意思就是一定是 leader 线程先取到头元素,其他线程需要等待 leader 线程的唤醒。这样就可以简化竞争的操作,直接让后面的线程等待,把竞争交给 Condition 来做。

  2. first == null

    目的是为了做 GC。假设没有这一句,那么这里很有可能是 follower 线程在等待的过程中一直持有 first 的引用,而 leader 线程已经完成任务了,都把 first 都释放了,原来希望被回收的 first 却一直没有被回收。在极端的情况下,在一瞬间高并发,会有大量的 follower 线程持有 first,而需要等这些线程都会唤醒后,first 才会被释放回收。

offer方法

offer 方法,add 和 put 最终还是调到 offer 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

放入元素,如果插入的元素是放在了头部的话:

  1. 把 leader 线程置为 null。因为 leader 的意义就是想要取头元素的那个线程,那么旧的 leader 将没有意义。
  2. 唤醒在等待的线程。原本线程都在等待头元素,但是头元素改变了,就唤醒一个线程让它重新取出头元素,并成为新的 leader (看 take 方法里面是一个 for 的死循环)。

SynchronousQueue

内部没有容器,线程安全,无锁,CAS,配对通信机制

  1. 内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。使用实例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import java.util.concurrent.SynchronousQueue;

    public class SynchronousQueueDemo {

    public static void main(String[] args) throws InterruptedException {
    final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

    Thread putThread = new Thread(() -> {
    System.out.println("put thread start");
    try {
    queue.put(1);
    } catch (InterruptedException e) {
    }
    System.out.println("put thread end");
    });

    Thread takeThread = new Thread(() -> {
    System.out.println("take thread start");
    try {
    System.out.println("take from putThread: " + queue.take());
    } catch (InterruptedException e) {}
    System.out.println("take thread end");
    });

    putThread.start();
    Thread.sleep(1000);
    takeThread.start();
    }
    }

    一种输出结果:

    1
    2
    3
    4
    5
    put thread start
    take thread start
    take from putThread: 1
    put thread end
    take thread end
  2. 默认非公平模型(基于栈所以是后进先出),支持公平模型(基于队列,先进先出)。

    1
    2
    3
    4
    5
    6
    public SynchronousQueue() {
    this(false);
    }
    public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

    公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。公平策略总结下来就是:队尾匹配队头出队

    非公平模式下,底层实现使用的是TransferStack这个内部栈,有一个head指针指向栈顶。

  3. 不允许插入null

  4. 线程安全

  5. 不支持fail-fast机制

参考资料:

  1. SynchronousQueue实现原理
  2. BlockingQueue 实现之 SynchronousQueue

LinkedTransferQueue

无界,基于单链表,无锁,线程安全,双重队列

  1. 实现了TransferQueue接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;

    public interface TransferQueue<E> extends BlockingQueue<E> {
    //该方法放入元素后,一定要被消费者消费后,线程才释放,否则会一直堵塞
    void transfer(E e) throws InterruptedException;
    /**
    * tryTransfer 和上面的 transfer 方法相比,
    * 该方入队元素后,无论是否消费都立即返回
    * 如果没有消费者接收元素,则元素不入队,返回的是 false
    */
    boolean tryTransfer(E e);
    /**
    * 该方法加入了时间等待,假设超过时间没有消费者线程接收
    * 则元素不会入队,并返回false
    */
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;
    /**
    * 判断是否有等待中的客户端线程
    */
    boolean hasWaitingConsumer();
    /**
    * 获取等待接收元素的消费者数量
    */
    int getWaitingConsumerCount();
  2. 关键成员变量如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //获取处理器数量,判断是否是多个
    private static final boolean MP =
    Runtime.getRuntime().availableProcessors() > 1;
    //自旋次数,阻塞前的自旋次数(这里向左偏移,一定是 2 的 n 次方)
    private static final int FRONT_SPINS = 1 << 7;
    //自旋次数,一样是为 2 的 n 次方
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
    //达到该阈值时关闭
    static final int SWEEP_THRESHOLD = 32;
  3. 双重队列,放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。

    放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。

    取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。

    不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。

  4. 基于单链表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    static final class Node {
    // 是否是数据节点(也就标识了是生产者还是消费者)
    final boolean isData; // false if this is a request node
    // 元素的值
    volatile Object item; // initially non-null if isData; CASed to match
    // 下一个节点
    volatile Node next;
    // 持有元素的线程
    volatile Thread waiter; // null until waiting
    }

    典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。内部通过isData区分是生产者还是消费者。

  5. 无界的一个阻塞队列

    1
    2
    3
    4
    5
    6
    public LinkedTransferQueue() {
    }
    public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
    }

    只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。

参考资料

  1. 死磕 java集合之LinkedTransferQueue源码分析
  2. LinkedTransferQueue详解
  3. J.U.C之collections框架:LinkedTransferQueue

LinkedBlockingDeque

基于双向链表,双端阻塞队列,线程安全,有界队列

  1. LinkedBlockingDeque有 LinkedBlockingQueue的所有方法,并且还提供了双端队列的一些其他方法。可以向队尾添加元素,也可以向队头添加元素,删除同理。

  2. 有界队列,默认和最大长度为 Integer.MAX_VALUE,并且可以指定lock使用公平锁还是非公平锁

  3. 不允许插入null

  4. 线程安全

  5. 不支持fail-fast机制

总结:

LinkedBlockingDeque和LinkedBlockingQueue的相同点在于:

  1. 基于链表
  2. 容量可选,不设置的话,就是Int的最大值

和LinkedBlockingQueue的不同点在于:

  1. 双端链表和单链表
  2. LinkedBlockingDeque不存在头节点
  3. LinkedBlockingDeque是基于单锁实现

参考资料

  1. 说说LinkedBlockingDeque

疑问:为什么LinkedBlockingDeque不能像LinkedBlockingQueue使用双锁?

关键在于:LinkedBlockingDeque使用双锁无法像LinkedBlockingQueue细化并发粒度

  • 先从现有的实现分析:

    现有实现基于单锁,2个条件notEmply,notFull

    1. 假设队列已满,A线程调用putLast方法,发现队列已满,调用notFull.await方法进入阻塞状态
    2. 然后,B线程调用putFirst方法,发现队列已满,调用notFull.await方法进入阻塞状态
    3. 然后,C线程调用takeLast方法,然后再调用notFull.signal方法唤醒A、B线程。
  • 然后分析双锁如何实现LinkedBlockingDeque:

    我们参考LinkedBlockingQueue使用双锁:

    putLock,takeLock,notEmply = putLock.newCondition(),notFull = takeLock.newCondition()

    但是很遗憾,这样是不行的。因为如果根据操作类型区分锁的话,队头和队尾就可以同时删除,同时添加(因为是两把不同的独占锁)。

    那就换个思路,根据头尾节点区分锁,分别叫做lastlock,firstlock,因为头尾节点既可以添加元素也可以删除元素,所以需要四个条件,假设分别是lastNotEmply,lastNotFull,firstNotEmply,firstNotFull。

    这样看起来似乎是可行的(实际也是可实现的,但是跟单锁+2个条件效果一样),依然用上面的例子:

    1. 假设队列已满,A线程调用putLast方法,发现队列已满,调用lastNotFull.await方法进入阻塞状态
    2. 然后,B线程调用putFirst方法,发现队列已满,调用firstNotFull.await方法进入阻塞状态
    3. 然后,C线程调用takeLast方法,这个时候问题来了,因为我们需要同时唤醒AB,所以需要分别调用lastNotFull.signal,firstNotFull.signal。

    仔细一想,这和单锁+2个条件是一个意思啊,所以干脆直接用单锁实现了。

坚持原创技术分享,您的支持将鼓励我继续创作!