阻塞队列
阻塞队列有几个实现:
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- DelayQueue
- SynchronousQueue
- LinkedTransferQueue
- LinkedBlockingDeque
他们的共同父类是AbstractQueue。我们一起看ArrayBlockingQueue的实现。
ArrayBlockingQueue,数组、有界、出入队阻塞
数据存储
数据存储在数组中,用几个变量标记下一个获取或存储的元素:
/** The queued items */ final Object[] items; // 用数组存储元素 /** items index for next take, poll, peek or remove */ int takeIndex; // 返回元素的下标 /** items index for next put, offer, or add */ int putIndex; // 插入元素的下标 /** Number of elements in the queue */ int count; // 数量
阻塞逻辑
添加、删除元素需要使用ReentrantLock加锁,满队列、空队列情况的等待与唤醒使用各自的Condition:
public ArrayBlockingQueue(int capacity, boolean fair) { ... lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
插入元素,返回是否成功
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) // 队列满,插入不成,返回false return false; else { enqueue(e); return true; // 插入成功,返回true } } finally { lock.unlock(); } }
插入元素,成功返回true,失败抛出异常
它调用offer方法,插入成功返回true,失败抛出异常:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
插入元素,队列满了则阻塞
public void put(E e) throws InterruptedException { checkNotNull(e); // e为空抛出异常 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 当队列满了 notFull.await(); // notFull的Condition等待条件成熟 enqueue(e); // 条件成熟了才插入元素 } finally { lock.unlock(); } }
插入元素,队列满了则阻塞指定超时时间
主体逻辑与put(E e)
一致,只是加了超时逻辑:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); // 将超时时间转换为Nano单位 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) // 超时了,返回false return false; nanos = notFull.awaitNanos(nanos); // Condition等待指定时间 } enqueue(e); return true; // 超时时间内插入成功,返回true } finally { lock.unlock(); } }
删除元素,返回是否删除成功
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { // 遍历到要删除的元素,删除并返回true removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; // 遍历完毕,没有找到,返回false } finally { lock.unlock(); } }
删除元素,返回删除的元素,没匹配到返回null
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
删除元素,队列为空则阻塞
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 队列为空 notEmpty.await(); // 元素出队,队列元素为空,等待非空条件成熟 return dequeue(); } finally { lock.unlock(); } }
入队逻辑,入队成功后同时非空条件成熟:
private void enqueue(E x) { // 入队 // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 元素入队后,通知非空条件已成熟 }
删除元素,队列为空阻塞指定超时时间
主体逻辑与take()
一直,但有等待超时逻辑:
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); // 转化为nano单位 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); // 等待指定超时时间 } return dequeue(); } finally { lock.unlock(); } }
LinkedBlockingQueue,链表、有界、出入队阻塞
存储结构
用链表作为存储,Node是链表的节点:
static class Node{ E item; // 元素值 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node next; // 下一节点 Node(E x) { item = x; } // 构造方法 }
PriorityBlockingQueue,无界,出队阻塞
出队阻塞
它是无界的,所以只有出队时队列无元素才会堵塞,依赖notEmpty的Condition:
/** * Condition for blocking when empty */ private final Condition notEmpty;
优先级顺序
它的优先级依赖比较器:
/** * The comparator, or null if priority queue uses elements' * natural ordering. */ private transient Comparator comparator;
DelayQueue,无界、出队阻塞、等待指定时间才能出队
数据存储
它的数据实现依赖于PriorityQueue,所以队列的元素需实现Comparable:
private final PriorityQueueq = new PriorityQueue ();
出队
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); // 获取下一个即将的出队元素 if (first == null || first.getDelay(NANOSECONDS) > 0) // 如果无出队元素,或出队元素的时间未到 return null; else return q.poll(); // 实际的出队 } finally { lock.unlock(); } }
阻塞出队
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); // 获取将要出队的元素 if (first == null) // 为空,则等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 时间已到,出队,跳出方法 return q.poll(); first = null; // don't retain ref while waiting // 等待期间取消引用 if (leader != null) # TODO,未理解透彻 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; // 当前线程赋予leader try { available.awaitNanos(delay); // 等待剩余时间 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
SynchronousQueue,阻塞队列,不存储元素
依赖于TransferQueue和TransferStack
它可设置是否公平,分别依赖于TransferQueue和TransferStack,默认非公平
public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); }
阻塞入队和出队
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }