博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java】Java Queue的简介
阅读量:6347 次
发布时间:2019-06-22

本文共 7828 字,大约阅读时间需要 26 分钟。

阻塞队列

阻塞队列有几个实现:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • DelayQueue
  • SynchronousQueue
  • LinkedTransferQueue
  • LinkedBlockingDeque

他们的共同父类是AbstractQueue。我们一起看ArrayBlockingQueue的实现。

ArrayBlockingQueue,数组、有界、出入队阻塞

数据存储

数据存储在数组中,用几个变量标记下一个获取或存储的元素:

/** The queued items */    final Object[] items; // 用数组存储元素    /** items index for next take, poll, peek or remove */    int takeIndex; // 返回元素的下标    /** items index for next put, offer, or add */    int putIndex; // 插入元素的下标    /** Number of elements in the queue */    int count; // 数量

阻塞逻辑

添加、删除元素需要使用ReentrantLock加锁,满队列、空队列情况的等待与唤醒使用各自的Condition:

public ArrayBlockingQueue(int capacity, boolean fair) {        ...        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }

插入元素,返回是否成功

public boolean offer(E e) {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lock();        try {            if (count == items.length) // 队列满,插入不成,返回false                return false;            else {                enqueue(e);                return true; // 插入成功,返回true            }        } finally {            lock.unlock();        }    }

插入元素,成功返回true,失败抛出异常

它调用offer方法,插入成功返回true,失败抛出异常:

public boolean add(E e) {        if (offer(e))            return true;        else            throw new IllegalStateException("Queue full");    }

插入元素,队列满了则阻塞

public void put(E e) throws InterruptedException {        checkNotNull(e); // e为空抛出异常        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length) // 当队列满了                notFull.await(); // notFull的Condition等待条件成熟            enqueue(e); // 条件成熟了才插入元素        } finally {            lock.unlock();        }    }

插入元素,队列满了则阻塞指定超时时间

主体逻辑与put(E e)一致,只是加了超时逻辑:

public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {        checkNotNull(e);        long nanos = unit.toNanos(timeout); // 将超时时间转换为Nano单位        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length) {                if (nanos <= 0) // 超时了,返回false                    return false;                nanos = notFull.awaitNanos(nanos); // Condition等待指定时间            }            enqueue(e);            return true; // 超时时间内插入成功,返回true        } finally {            lock.unlock();        }    }

删除元素,返回是否删除成功

public boolean remove(Object o) {        if (o == null) return false;        final Object[] items = this.items;        final ReentrantLock lock = this.lock;        lock.lock();        try {            if (count > 0) {                final int putIndex = this.putIndex;                int i = takeIndex;                do {                    if (o.equals(items[i])) { // 遍历到要删除的元素,删除并返回true                        removeAt(i);                        return true;                    }                    if (++i == items.length)                        i = 0;                } while (i != putIndex);            }            return false; // 遍历完毕,没有找到,返回false        } finally {            lock.unlock();        }    }

删除元素,返回删除的元素,没匹配到返回null

public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return (count == 0) ? null : dequeue();        } finally {            lock.unlock();        }    }

删除元素,队列为空则阻塞

public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0) // 队列为空                notEmpty.await(); // 元素出队,队列元素为空,等待非空条件成熟            return dequeue();        } finally {            lock.unlock();        }    }

入队逻辑,入队成功后同时非空条件成熟:

private void enqueue(E x) { // 入队        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal(); // 元素入队后,通知非空条件已成熟    }

删除元素,队列为空阻塞指定超时时间

主体逻辑与take()一直,但有等待超时逻辑:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {        long nanos = unit.toNanos(timeout); // 转化为nano单位        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0) {                if (nanos <= 0)                    return null;                nanos = notEmpty.awaitNanos(nanos); // 等待指定超时时间            }            return dequeue();        } finally {            lock.unlock();        }    }

LinkedBlockingQueue,链表、有界、出入队阻塞

存储结构

用链表作为存储,Node是链表的节点:

static class Node
{ E item; // 元素值 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node
next; // 下一节点 Node(E x) { item = x; } // 构造方法 }

PriorityBlockingQueue,无界,出队阻塞

出队阻塞

它是无界的,所以只有出队时队列无元素才会堵塞,依赖notEmpty的Condition:

/**     * Condition for blocking when empty     */    private final Condition notEmpty;

优先级顺序

它的优先级依赖比较器:

/**     * The comparator, or null if priority queue uses elements'     * natural ordering.     */    private transient Comparator
comparator;

DelayQueue,无界、出队阻塞、等待指定时间才能出队

数据存储

它的数据实现依赖于PriorityQueue,所以队列的元素需实现Comparable:

private final PriorityQueue
q = new PriorityQueue
();

出队

public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            E first = q.peek(); // 获取下一个即将的出队元素            if (first == null || first.getDelay(NANOSECONDS) > 0) // 如果无出队元素,或出队元素的时间未到                return null;            else                return q.poll(); // 实际的出队        } finally {            lock.unlock();        }    }

阻塞出队

public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            for (;;) {                E first = q.peek(); // 获取将要出队的元素                if (first == null) // 为空,则等待                    available.await();                else {                    long delay = first.getDelay(NANOSECONDS);                    if (delay <= 0) // 时间已到,出队,跳出方法                        return q.poll();                    first = null; // don't retain ref while waiting // 等待期间取消引用                    if (leader != null) # TODO,未理解透彻                        available.await();                    else {                        Thread thisThread = Thread.currentThread();                        leader = thisThread; // 当前线程赋予leader                        try {                            available.awaitNanos(delay); // 等待剩余时间                        } finally {                            if (leader == thisThread)                                leader = null;                        }                    }                }            }        } finally {            if (leader == null && q.peek() != null)                available.signal();            lock.unlock();        }    }

SynchronousQueue,阻塞队列,不存储元素

依赖于TransferQueue和TransferStack

它可设置是否公平,分别依赖于TransferQueue和TransferStack,默认非公平

public SynchronousQueue(boolean fair) {        transferer = fair ? new TransferQueue
() : new TransferStack
(); }

阻塞入队和出队

public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        if (transferer.transfer(e, false, 0) == null) {            Thread.interrupted();            throw new InterruptedException();        }    }
public E take() throws InterruptedException {        E e = transferer.transfer(null, false, 0);        if (e != null)            return e;        Thread.interrupted();        throw new InterruptedException();    }

转载地址:http://qjcla.baihongyu.com/

你可能感兴趣的文章
javascript之数组的6种去重方法
查看>>
爱创课堂每日一题九十四天- 对WEB标准以及W3C的理解与认识?
查看>>
磁盘显示无法访问拒绝访问,里面的资料怎样找到
查看>>
Java之品优购课程讲义_day07(5)
查看>>
Java的新项目学成在线笔记-day3(八)
查看>>
Windows 下 Python 3.6 下安装 TensorFlow (屡败屡战)
查看>>
php中base64_decode与base64_encode加密解密函数
查看>>
Windows常用知识总结
查看>>
华为交换机hybrid配置小实验
查看>>
单位网络布线中需要注意的几个问题及心得
查看>>
Hanlp分词之CRF中文词法分析详解
查看>>
苹果宣布开源 FoundationDB 数据库
查看>>
关于CmsEasy数据库配置文件修改路径说明
查看>>
openStack 主机流量计运行状态 随笔记录
查看>>
Niushop针对商城难推广提出6大方法,一切如此简单!
查看>>
路由简单的实验
查看>>
Centos6.4 xen编译部署
查看>>
三分钟搞懂专利如何申请
查看>>
好程序员web前端教程分享js reduce方法使用教程
查看>>
零基础学习大数据Hadoop需要什么准备?Hadoop如何发展起来的?
查看>>