LinkedBlockingQueue 源码解析
BlockingQueue 接口定义
对元素操作
抛出异常 | 返回特殊值 | 阻塞 | 超时等待 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
删除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | - | - |
LinkedBlockingQueue 源码
offer
public boolean offer(E e) {
// 如果元素为 null,抛出 NullPointerException
if (e == null) throw new NullPointerException();
// 获取当前队列总数
final AtomicInteger count = this.count;
// 如果当前队列总数达到最大值,直接返回 false (加入队列失败)
if (count.get() == capacity)
return false;
// 初始化【队列总数】,如果添加成功,c >=0 ,添加败则为 -1
int c = -1;
// 将元素包装成 Node
Node<E> node = new Node<E>(e);
// 获取锁对象(非公平锁)
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
// 队列总数小于最大值
if (count.get() < capacity) {
// 放入队列,将队列的【last】和 【last.next】赋值为 node
enqueue(node);
// 增加队列总数,返回操作前的总数
c = count.getAndIncrement();
// 如果队列总数 +1 没超过最大值,发送【队列没满】的信号,唤醒【写入】的等待线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果操作前队列是空的,则发送队列不为空的信号,唤醒【取出】的等待线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 获取锁的过程中允许被中断
putLock.lockInterruptibly();
try {
// 当队列满的时候,进入等待,unsafe.park()
while (count.get() == capacity) {
notFull.await();
}
// 如果没满,则插入节点
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
take
public E take() throws InterruptedException {
// 初始化返回的变量
E x;
// 初始化【队列总数】,如果取出成功,c >=0 ,取失败则为 -1
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁,允许被中断
takeLock.lockInterruptibly();
try {
// 当队列为空时,挂起线程
while (count.get() == 0) {
notEmpty.await();
}
// 取出队列第一个元素【head】,同时把head设为head.next
x = dequeue();
// 减少任务总数,并返回减少之前的总是
c = count.getAndDecrement();
// 如果之前的总数大于 1 ,则发送不为空的信号,唤醒【取】的线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果取出之前的总数不为最大值,则发送没满的信号,唤醒【存】的线程
if (c == capacity)
signalNotFull();
return x;
}
poll
public E poll() {
final AtomicInteger count = this.count;
// 任务总数为 0 直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 加锁,不允许被中断
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
offer(e,timeout,unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 转换超时时间纳秒
long nanos = unit.toNanos(timeout);
// 初始化【队列总数】,如果添加成功,c >=0 ,添加败则为 -1
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 加锁,允许被打断
putLock.lockInterruptibly();
try {
// 当队列满了的时候,触发等待
while (count.get() == capacity) {
// 如果剩余的等待时间小于等于0则返回false(添加失败)
if (nanos <= 0)
return false;
// 返回剩余的等待时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
poll(timeout,unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 初始化返回元素
E x = null;
// 初始变化【取出】之前的任务总数,如果取出成功 c > 0 ,取出失败则为 -1
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁,允许被打断
takeLock.lockInterruptibly();
try {
// 当任务队列为空时,进入超时等待
while (count.get() == 0) {
// 剩余等待时间 <=0 则返回 null
if (nanos <= 0)
return null;
// 返回剩余等待时间
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
本作品采用《CC 协议》,转载必须注明作者和本文链接