LinkedBlockingQueue 源码解析

BlockingQueue 接口定义

LinkedBlockingQueue 源码解析

对元素操作

抛出异常 返回特殊值 阻塞 超时等待
插入 add(e) offer(e) put(e) offer(e,time,unit)
删除 remove() poll() take() poll(time,unit)
检查 element() peek() - -

LinkedBlockingQueue 源码

offer

    public boolean offer(E e) {
        // 如果元素为 null,抛出 NullPointerException
        if (e == null) throw new NullPointerException();
        // 获取当前队列总数
        final AtomicInteger count = this.count;
        // 如果当前队列总数达到最大值,直接返回 false (加入队列失败)
        if (count.get() == capacity)
            return false;
        // 初始化【队列总数】,如果添加成功,c >=0 ,添加败则为 -1
        int c = -1;
        // 将元素包装成 Node
        Node<E> node = new Node<E>(e);
        // 获取锁对象(非公平锁)
        final ReentrantLock putLock = this.putLock;
        // 加锁
        putLock.lock();
        try {
            // 队列总数小于最大值
            if (count.get() < capacity) {
               // 放入队列,将队列的【last】和 【last.next】赋值为 node
                enqueue(node);
                // 增加队列总数,返回操作前的总数
                c = count.getAndIncrement();
                // 如果队列总数 +1 没超过最大值,发送【队列没满】的信号,唤醒【写入】的等待线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
        // 如果操作前队列是空的,则发送队列不为空的信号,唤醒【取出】的等待线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

put

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获取锁的过程中允许被中断
        putLock.lockInterruptibly();
        try {
              // 当队列满的时候,进入等待,unsafe.park()
            while (count.get() == capacity) {
                notFull.await();
            }
            // 如果没满,则插入节点
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

take

 public E take() throws InterruptedException {
        // 初始化返回的变量
        E x;
        // 初始化【队列总数】,如果取出成功,c >=0 ,取失败则为 -1
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 加锁,允许被中断
        takeLock.lockInterruptibly();
        try {
            // 当队列为空时,挂起线程
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 取出队列第一个元素【head】,同时把head设为head.next
            x = dequeue();
            // 减少任务总数,并返回减少之前的总是
            c = count.getAndDecrement();
            // 如果之前的总数大于 1 ,则发送不为空的信号,唤醒【取】的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放锁
            takeLock.unlock();
        }
        // 如果取出之前的总数不为最大值,则发送没满的信号,唤醒【存】的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

poll

    public E poll() {
        final AtomicInteger count = this.count;
        // 任务总数为 0 直接返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        // 加锁,不允许被中断
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

offer(e,timeout,unit)

     public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        // 转换超时时间纳秒
        long nanos = unit.toNanos(timeout);
        // 初始化【队列总数】,如果添加成功,c >=0 ,添加败则为 -1
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 加锁,允许被打断
        putLock.lockInterruptibly();
        try {
            // 当队列满了的时候,触发等待
            while (count.get() == capacity) {
                // 如果剩余的等待时间小于等于0则返回false(添加失败)
                if (nanos <= 0)
                    return false;
                // 返回剩余的等待时间
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

poll(timeout,unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // 初始化返回元素
        E x = null;
        // 初始变化【取出】之前的任务总数,如果取出成功 c > 0 ,取出失败则为 -1
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 加锁,允许被打断
        takeLock.lockInterruptibly();
        try {
            // 当任务队列为空时,进入超时等待
            while (count.get() == 0) {
                // 剩余等待时间 <=0 则返回 null
                if (nanos <= 0)
                    return null;
                // 返回剩余等待时间
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!