JDK提供的集合类型主要分为四种类型:
- List:支持重复元素
- Set:不支持重复元素
- Map:键/值对的映射集
- Queue/Deque(double ended queue):queue是在集合尾部添加元素,在头部删除元素的队列,deque是可在头部和尾部添加或者删除元素的双端队列,deque既可以实现队列又可以实现栈。
本文基于JDK8,java version “1.8.0_251”
ArrayDeque
基于数组,循环数组,非线程安全,效率高,双端队列,即可实现队列也可实现栈
基于数组,最小容量为8,默认容量为16,最大容量Integer.MAX_VALUE-8(2^31 - 8),容量总是为2的次幂(用于判断是否需要扩容)。
1
2
3
4
5transient Object[] elements; // non-private to simplify nested class access
private static final int MIN_INITIAL_CAPACITY = 8;
public ArrayDeque() {
elements = new Object[16];
}即可实现队列也可实现栈,ArrayDeque 和 LinkedList 都是 Deque 接口的实现类,都具备既可以作为队列,又可以作为栈来使用的特性,两者主要区别在于底层数据结构的不同。ArrayDeque 底层数据结构是以循环数组为基础,而 LinkedList 底层数据结构是以循环链表为基础。理论上,链表在添加、删除方面性能高于数组结构,在查询方面数组结构性能高于链表结构,但是对于数组结构,如果不进行数组移动,在添加方面效率也很高。LinkedList的内存开销更大。对于小数据量,ArrayDeque 和 LinkedList 在效率方面相差不大,但是对于大数据量,推荐使用 ArrayDeque。
循环数组,为了满足可以同时在数组两端插入或删除元素的需求,该数组还必须是循环的,即循环数组,也就是说数组的任何一点都可能被看作起点或者终点,head和tail分别代表起点和终点的索引值。因为是循环数组,所以 head 不一定总是指向下标为 0 的数组元素,tail 也不一定总是比 head 大
1
2
3
4
5
6
7
8
9
10
11
12/**
* The index of the element at the head of the deque (which is the
* element that would be removed by remove() or pop()); or an
* arbitrary number equal to tail if the deque is empty.
*/
transient int head;
/**
* The index at which the next element would be added to the tail
* of the deque (via addLast(E), add(E), or push(E)).
*/
transient int tail;动态扩容,容量为原来的2倍。
(tail = (tail + 1) & (elements.length - 1)) == head
用与运算来计算终点位置,然后判断tail是否和head重合,如果重合就触发扩容。扩容申请一个2倍大小的数组,将原数组复制到新数组。复制数组分两次进行,第一次复制 head 头部索引至数组末端的元素到新数组,第二次复制 head 左边的元素到新数组。1
2
3
4
5
6
7public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
if ( (tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity();
}ArrayDeque不允许插入null,而 LinkedList 允许插入null。
非线程安全,也没办法通过Collections类变为线程安全的类。
支持fail-fast机制,如果tail != fence || result == null,则抛出ConcurrentModificationException异常
1
2
3
4
5
6
7
8
9
10
11
12
13public E next() {
if (cursor == fence)
throw new NoSuchElementException();
"unchecked") (
E result = (E) elements[cursor];
// This check doesn't catch all possible comodifications,
// but does catch the ones that corrupt traversal
if (tail != fence || result == null)
throw new ConcurrentModificationException();
lastRet = cursor;
cursor = (cursor + 1) & (elements.length - 1);
return result;
}
PriorityQueue
优先队列,小顶堆,非线程安全
基于数组实现的小顶堆,根据元素的CompareTo方法顺序或自定义的comparator比较顺序。
父节点和子节点的编号是有联系的。
1
2
3leftNo = parentNo*2+1
rightNo = parentNo*2+2
parentNo = (nodeNo-1)/2添加元素。新加入的元素可能会破坏小顶堆的性质,因此需要进行必要的调整。
删除类似和添加元素类似。
动态扩容,如果原数组容量小于64,则扩容为原数组容量的2倍+2,否则扩容为原数组容量的3/2倍。
1
2
3
4
5
6
7
8
9
10
11private void grow(int minCapacity) { // minCapacity等于原数组容量+1
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
ConcurrentLinkedQueue
线程安全的非阻塞队列,基于链表的无界线程安全队列,无锁
基于链表,不能指定最大容量
1
2
3private static class Node<E> {
volatile E item;
volatile Node<E> next;head节点表示队头,tail节点表示队尾,head节点用来出队列使用,一个tail节点用来入队列使用,初始化的时候head = tail 都是一个空节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* - tail != null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}head 未必是队列中第一个元素(head指向的可能是一个已经被移除的元素),tail 未必是队列中最后一个元素(tail.next 可以不为 null)
有了 head 和 tail 节点,如果按照我们平常的思维,head 节点即头节点,tail 节点即尾节点。那么入队列的时候,将 tail 的 next 节点设置为 newNode,将 newNode 设置为 tail;出队列的时候,将 head 节点元素返回,head 的 next 节点设置为 head。实现代码如下:
1
2
3
4
5
6
7
8
9
10
11public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (; ; ) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}这样的做法 tail 节点永远作为队列的尾节点,head 节点永远为队列的头节点。实现代码量非常少,而且逻辑清晰和易懂。但是,这么做有个缺点,每次都需要使用循环 CAS 更新 tail 节点。所以 doug lea 为了减少 CAS 更新 tail 节点的次数,提高入队的效率,使用增加循环来控制 tail 节点的更新频率,并不是每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节点和尾节点不一致时(也就是循环两次)才更新 tail 节点。
size方法可能不准确,(实际上juc集合里面的size方法所返回的元素个数都是不保证准确的)
参考资料:
ConcurrentLinkedDeque
基于双向链表结构的无界并发队列,CAS,与 ConcurrentLinkedQueue 的区别是该阻塞队列同时支持FIFO和FILO两种操作方式
- 基于双向链表结构的无界并发队列
参考资料:
BlockingQueue
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界(但大小默认值为Interger.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列,也即单个元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向有界阻塞队列。
ArrayBlockingQueue
有界阻塞队列,基于循环数组,线程安全
基于循环数组,关键成员变量如下:
- lock的作用:在多线程下操作的,所有修改items、takeIndex、putIndex和count这些成员变量时,必须要考虑多线程安全问题,这里使用lock独占锁,来保证并发操作的安全。
- notEmpty与notFull的作用:因为阻塞队列必须实现,当队列为空或队列已满的时候,队列的读取或插入操作要等待。当队列从空时,插入元素需要唤醒之前因为读取等待的线程。当队列已满时,移出元素需要唤醒之前因为插入等待的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/** 储存队列的中元素 */
final Object[] items;
/** 队列头的位置 */
int takeIndex;
/** 队列尾的位置 */
int putIndex;
/** 当前队列拥有的元素个数 */
int count;
/** 用来保证多线程操作共享变量的安全问题 */
final ReentrantLock lock;
/** 当队列为空时,就会调用notEmpty的wait方法,让当前线程等待 */
private final Condition notEmpty;
/** 当队列为满时,就会调用notFull的wait方法,让当前线程等待 */
private final Condition notFull;有界队列,构造方法需要指定队列大小,并且可以指定lock使用公平锁还是非公平锁。
1
2
3
4
5
6
7
8
9
10
11public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}不允许插入null
循环数组,和ArrayDeque同理。
线程安全
不支持fail-fast机制
源码说明:
lock.lock()是保证同一时间只有一个线程修改成员变量,防止出现并发操作问题。虽然它也会阻塞当前线程,但是它并不是条件等待,只是因为锁被其他线程持有,而ArrayBlockingQueue中方法操作时间都不长,这里相当于不阻塞线程。
add(E e)与offer(E e)方法
向队尾新添加元素,如果队列已满,返回false,否则添加成功返回true。
add方法调用父类的方法,父类的方法会调用offer方法,最终add方法实际就是调用offer方法。
1 | public boolean offer(E e) { |
enqueue(E x)方法
向队尾新添加元素,向队列末尾新添加元素,添加成功则将putIndex+1,如果达到数组长度,putIndex=0(因为循环数组),然后唤醒因为数组为空时获取元素阻塞的线程。
1 | private void enqueue(E x) { |
疑问:调用notEmpty.signal()之前是不是应该判断一下当前的长度是否为1?
put方法
向队尾新添加元素。向队列末尾新添加元素,如果队列已满,则调用notFull.await(),这里判断需要用while而不能用if,目的是防止虚假唤醒。线程被唤醒之后,检查是否已满,如果队列还是满的,则继续等待,如果不是满的,则向队尾添加元素。
1 | public void put(E e) throws InterruptedException { |
offer(E e, long timeout, TimeUnit unit)方法
向队尾新添加元素,可设置最大的阻塞时间。
如果队列中没有可用空间,当前线程就等待, 如果等待时间超过timeout了,那么返回false,表示添加失败。如果被唤醒,则检查是否有可用空间,如果有可用空间,则继续添加元素。
1 | public boolean offer(E e, long timeout, TimeUnit unit) |
remove()和poll()方法
删除队头元素,如果队列为空,返回null,不为空删除成功返回元素。
remove方法调用父类的方法,父类的方法会调用poll方法,最终remove方法实际就是调用poll方法。
1 | public E poll() { |
dequeue方法
删除队头元素,先将队头元素删除,然后将takeIndex的值+1,如果达到数组长度,takeIndex=0(因为循环数组),然后唤醒因为数组已满时添加元素阻塞的线程。
1 | private E dequeue() { |
疑问:调用notFull.signal();之前是否应该先判断容量为数组长度-1?
take()方法
删除队头元素,如果当前容量为0,则等待。否则删除元素。这里判断当前容量不能用if,目的是防止虚假唤醒。如果被唤醒,则检查队里是否有元素,如果有元素,则继续删除元素。
1 | public E take() throws InterruptedException { |
poll(long timeout, TimeUnit unit)方法
删除队头元素,可设置最大的阻塞时间。
如果当前容量为0,则等待,等待超过最大等待时间则返回null。如果被唤醒,则检查队里是否有元素,如果有元,则继续删除元素。
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
element()与peek() 方法
查看元素
element方法调用父类的方法,父类的方法会调用peek方法,最终element方法实际就是调用peek方法。
1 | public E peek() { |
remove(Object o)方法
删除指定元素
1 | public boolean remove(Object o) { |
从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。这里有两点需要注意:
- 如何遍历队列,就是从队列头遍历到队列尾。就要靠takeIndex和putIndex两个变量了。
- 为什么Object[] items = this.items;这句代码没有放到同步锁lock代码块内。items是成员变量,那么多线程操作的时候,不会有并发问题么?
这个是因为items是个引用变量,不是基本数据类型,而且我们对队列的插入和删除操作,都是针对这一个items数组,没有改变数组的引用,所以在lock代码中,items会得到其他线程对它最新的修改。但是如果这里将int putIndex = this.putIndex;方法lock代码块外面,就会产生问题。
removeAt(final int removeIndex)方法
删除指定位置的元素,需要注意的是删除之后的数组还能保持队列形式,分为两种情况:
- 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置+1
- 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeIndex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null
1 | void removeAt(final int removeIndex) { |
疑问:为什么构造方法要加锁?
1 | /** |
注释中有写:锁的用处在于可见性而不是竞争。此处使用锁的意义在于避免指令重排序,避免出现对象未初始化完,就将将内存空间的地址赋值给对应的引用。保证count,putIndex,items[i]的可见性。
注意item被final修饰,只能保证final字段对应的引用是up-to-date的。
LinkedBlockingQueue
有界阻塞队列,基于链表,线程安全,双锁
基于链表,关键成员变量如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();双锁。一个take锁,控制消费者并发,一个put锁,控制生产者并发
有界队列,默认和最大长度为 Integer.MAX_VALUE,并且可以指定lock使用公平锁还是非公平锁。
不允许插入null
线程安全
不支持fail-fast机制
add(E e)与offer(E e)方法
向队尾新添加元素,如果队列已满,返回false。如果队列没有满,添加元素,添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。
add方法调用父类的方法,父类的方法会调用offer方法,最终add方法实际就是调用offer方法。
1 | public boolean offer(E e) { |
put方法
向队尾新添加元素。向队列末尾新添加元素,如果队列已满,则调用notFull.await(),这里判断需要用while而不能用if,目的是防止虚假唤醒。线程被唤醒之后,检查是否已满,如果队列还是满的,则继续等待,如果不是满的,则向队尾添加元素。添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。
1 | /** |
offer(E e, long timeout, TimeUnit unit)方法
向队尾新添加元素,可设置最大的阻塞时间。
如果队列中没有可用空间,当前线程就等待, 如果等待时间超过timeout了,那么返回false,表示添加失败。如果被唤醒,则检查队列是否已满,如果没满,则继续添加元素,添加成功后采用CAS操作更新队列容量,如果当前容量小于最大的容量(注意这里cas拿到的c是旧值),则唤醒因为队列已满,添加元素被阻塞的线程。如果之前队列为空,则唤醒因为队列为空,删除元素被阻塞的线程。
1 | /** |
remove()和poll()方法
删除队头元素,如果队列为空,返回null,不为空就删除元素,删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。
remove方法调用父类的方法,父类的方法会调用poll方法,最终remove方法实际就是调用poll方法。
1 | public E poll() { |
take()方法
删除队头元素,如果当前容量为0,则等待。否则删除元素。这里判断当前容量不能用if,目的是防止虚假唤醒。如果被唤醒,则检查队里是否有元素,如果有元素,则继续删除元素。删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。
1 | public E take() throws InterruptedException { |
poll(long timeout, TimeUnit unit)方法
删除队头元素,可设置最大的阻塞时间。
如果当前容量为0,则等待,等待超过最大等待时间则返回null。如果被唤醒,则检查队里是否有元素,如果有元,则继续删除元素。删除成功后采用CAS操作更新队列容量,最后返回元素。如果当前容量小于0(注意这里cas拿到的c是旧值),则唤醒因为队列为空,删除元素被阻塞的线程。如果之前队列已满,则唤醒因为队列已满,添加元素被阻塞的线程。
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
element()与peek() 方法
查看元素,需要放置元素被删除,增加不影响
element方法调用父类的方法,父类的方法会调用peek方法,最终element方法实际就是调用peek方法。
1 | public E peek() { |
remove(Object o)方法
删除指定元素,删除和添加都需要加锁。
1 | public boolean remove(Object o) { |
疑问:为什么ArrayBlockingQueue只用单锁?
目前没有找到确切的答案,但可以确定的是,ArrayBlockingQueue可以使用双锁实现,并且有明显的性能提升。
参考资料:
PriorityBlockingQueue
无界阻塞队列,基于数组实现的小顶堆,线程安全,CAS
基于数组实现的小顶堆,根据元素的CompareTo方法顺序或自定义的comparator比较顺序。关键成员变量如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 默认初始化容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 阻塞队列容量最大值
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 阻塞队列
private transient Object[] queue;
// 阻塞队列中元素数量
private transient int size;
// 比较器,元素没有实现comparable接口时,需提供比较器
private transient Comparator<? super E> comparator;
// 独占锁,读写线程共用这一把锁
private final ReentrantLock lock;
// 读线程等待队列,写线程永远不会阻塞
private final Condition notEmpty;
// 写线程扩容锁,通过CAS控制,只有一个写线程会将此变量从0变成1
private transient volatile int allocationSpinLock;无界阻塞队列,可以声明初始容量,但是会超过初始容量会自动扩容,最大容量为Integer.MAX_VALUE-8。
写线程不阻塞,读线程在队列为空时阻塞。
当队列为满时,写线程不会阻塞,而会尝试去扩容,扩容成功就继续向阻塞队列写入数据。当队列为空时,读线程会阻塞等待,直到队列不为空,被写线程唤醒。因此该阻塞队列适用于读多于写的场景,不然,写线程过多,会导致内存消耗过大,影响性能。读写线程共用同一把独占锁。
不允许插入null
线程安全
不支持fail-fast机制
动态扩容,如果原数组容量小于64,则扩容为原数组容量的2倍+2,否则扩容为原数组容量的3/2倍。
阻塞队列PriorityBlockingQueue从不阻塞写线程,当队列满时,写线程会尝试扩容阻塞队列,扩容成功后再向阻塞队列中新增元素,而当队列元素为空时,会阻塞读线程的读取,当然也有非阻塞的方法(poll)。该阻塞队列适用于读多于写的场景,不然,写线程过多,会导致内存消耗过大,影响性能。阻塞队列采用堆存储结构,因此每次冲阻塞队列取出的元素总是最小元素(或最大元素)。而堆存储需要提供比较器或者元素实现了阻塞接口,否则程序会抛出ClassCastException。
DelayQueue
延迟队列,基于优先队列,线程安全,无界
DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。本质上即:DelayQueue = BlockingQueue +PriorityQueue + Delayed。
1
2
3
4
5
6public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>();
}队列中只能存放实现Delayed接口的对象,而此接口有两个需要实现的方法。最重要的就是getDelay,这个方法需要返回对象过期前的时间。简单说,队列在某些方法处理前,会调用此方法来判断对象有没有超时。
关键成员变量如下:
1
2
3
4
5
6
7
8
9// 阻塞等待使用了可重入锁,只有一把
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列,用来对不同延迟任务的排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 这个Leader 有意思,解决了队列头的数据和线程的关联
// 同时解决了其他线程由谁唤醒
private Thread leader = null;
// 与Leader Thread配合 唤醒等待的Leader或者新Leader替换
private final Condition available = lock.newCondition();重入锁是非公平的,且不支持设置。
不允许插入null
线程安全
不支持fail-fast机制
动态扩容,依赖PriorityQueue动态扩容。
take方法
1 | /** |
可以看出延迟的实现原理就是用到了 Condition.awaitNanos(delay) 方法。先 peek 看看有没有元素,再看看元素有没有过期,过期就 poll 取出,还没过期就是 await 等待。
这里有两点需要注意:
leader线程的作用,下面是官方的说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;说了是用到 Leader-Follower 模式。如果一个线程是 leader 线程,那么它只会等待available.awaitNanos(delay) 这么多时间,其他后来的 follower 线程只能干等。意思就是一定是 leader 线程先取到头元素,其他线程需要等待 leader 线程的唤醒。这样就可以简化竞争的操作,直接让后面的线程等待,把竞争交给 Condition 来做。
first == null
目的是为了做 GC。假设没有这一句,那么这里很有可能是 follower 线程在等待的过程中一直持有 first 的引用,而 leader 线程已经完成任务了,都把 first 都释放了,原来希望被回收的 first 却一直没有被回收。在极端的情况下,在一瞬间高并发,会有大量的 follower 线程持有 first,而需要等这些线程都会唤醒后,first 才会被释放回收。
offer方法
offer 方法,add 和 put 最终还是调到 offer 方法。
1 | /** |
放入元素,如果插入的元素是放在了头部的话:
- 把 leader 线程置为 null。因为 leader 的意义就是想要取头元素的那个线程,那么旧的 leader 将没有意义。
- 唤醒在等待的线程。原本线程都在等待头元素,但是头元素改变了,就唤醒一个线程让它重新取出头元素,并成为新的 leader (看 take 方法里面是一个 for 的死循环)。
SynchronousQueue
内部没有容器,线程安全,无锁,CAS,配对通信机制
内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。使用实例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Thread putThread = new Thread(() -> {
System.out.println("put thread start");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put thread end");
});
Thread takeThread = new Thread(() -> {
System.out.println("take thread start");
try {
System.out.println("take from putThread: " + queue.take());
} catch (InterruptedException e) {}
System.out.println("take thread end");
});
putThread.start();
Thread.sleep(1000);
takeThread.start();
}
}一种输出结果:
1
2
3
4
5put thread start
take thread start
take from putThread: 1
put thread end
take thread end默认非公平模型(基于栈所以是后进先出),支持公平模型(基于队列,先进先出)。
1
2
3
4
5
6public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。公平策略总结下来就是:队尾匹配队头出队。
非公平模式下,底层实现使用的是TransferStack这个内部栈,有一个head指针指向栈顶。
不允许插入null
线程安全
不支持fail-fast机制
参考资料:
LinkedTransferQueue
无界,基于单链表,无锁,线程安全,双重队列
实现了TransferQueue接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
public interface TransferQueue<E> extends BlockingQueue<E> {
//该方法放入元素后,一定要被消费者消费后,线程才释放,否则会一直堵塞
void transfer(E e) throws InterruptedException;
/**
* tryTransfer 和上面的 transfer 方法相比,
* 该方入队元素后,无论是否消费都立即返回
* 如果没有消费者接收元素,则元素不入队,返回的是 false
*/
boolean tryTransfer(E e);
/**
* 该方法加入了时间等待,假设超过时间没有消费者线程接收
* 则元素不会入队,并返回false
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 判断是否有等待中的客户端线程
*/
boolean hasWaitingConsumer();
/**
* 获取等待接收元素的消费者数量
*/
int getWaitingConsumerCount();关键成员变量如下:
1
2
3
4
5
6
7
8
9//获取处理器数量,判断是否是多个
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
//自旋次数,阻塞前的自旋次数(这里向左偏移,一定是 2 的 n 次方)
private static final int FRONT_SPINS = 1 << 7;
//自旋次数,一样是为 2 的 n 次方
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
//达到该阈值时关闭
static final int SWEEP_THRESHOLD = 32;双重队列,放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。
基于单链表
1
2
3
4
5
6
7
8
9
10static final class Node {
// 是否是数据节点(也就标识了是生产者还是消费者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一个节点
volatile Node next;
// 持有元素的线程
volatile Thread waiter; // null until waiting
}典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。内部通过isData区分是生产者还是消费者。
无界的一个阻塞队列
1
2
3
4
5
6public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。
参考资料:
LinkedBlockingDeque
基于双向链表,双端阻塞队列,线程安全,有界队列
LinkedBlockingDeque有 LinkedBlockingQueue的所有方法,并且还提供了双端队列的一些其他方法。可以向队尾添加元素,也可以向队头添加元素,删除同理。
有界队列,默认和最大长度为 Integer.MAX_VALUE,并且可以指定lock使用公平锁还是非公平锁。
不允许插入null
线程安全
不支持fail-fast机制
总结:
LinkedBlockingDeque和LinkedBlockingQueue的相同点在于:
- 基于链表
- 容量可选,不设置的话,就是Int的最大值
和LinkedBlockingQueue的不同点在于:
- 双端链表和单链表
- LinkedBlockingDeque不存在头节点
- LinkedBlockingDeque是基于单锁实现
参考资料:
疑问:为什么LinkedBlockingDeque不能像LinkedBlockingQueue使用双锁?
关键在于:LinkedBlockingDeque使用双锁无法像LinkedBlockingQueue细化并发粒度。
先从现有的实现分析:
现有实现基于单锁,2个条件notEmply,notFull
- 假设队列已满,A线程调用putLast方法,发现队列已满,调用notFull.await方法进入阻塞状态
- 然后,B线程调用putFirst方法,发现队列已满,调用notFull.await方法进入阻塞状态
- 然后,C线程调用takeLast方法,然后再调用notFull.signal方法唤醒A、B线程。
然后分析双锁如何实现LinkedBlockingDeque:
我们参考LinkedBlockingQueue使用双锁:
putLock,takeLock,notEmply = putLock.newCondition(),notFull = takeLock.newCondition()
但是很遗憾,这样是不行的。因为如果根据操作类型区分锁的话,队头和队尾就可以同时删除,同时添加(因为是两把不同的独占锁)。
那就换个思路,根据头尾节点区分锁,分别叫做lastlock,firstlock,因为头尾节点既可以添加元素也可以删除元素,所以需要四个条件,假设分别是lastNotEmply,lastNotFull,firstNotEmply,firstNotFull。
这样看起来似乎是可行的(实际也是可实现的,但是跟单锁+2个条件效果一样),依然用上面的例子:
- 假设队列已满,A线程调用putLast方法,发现队列已满,调用lastNotFull.await方法进入阻塞状态
- 然后,B线程调用putFirst方法,发现队列已满,调用firstNotFull.await方法进入阻塞状态
- 然后,C线程调用takeLast方法,这个时候问题来了,因为我们需要同时唤醒AB,所以需要分别调用lastNotFull.signal,firstNotFull.signal。
仔细一想,这和单锁+2个条件是一个意思啊,所以干脆直接用单锁实现了。