JDK线程池源码研究

主要构成

workers: 工作组
queue: 任务队列
threadFactory: 线程生产工厂
handler: 异常处理

线程池状态

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

1. RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。

2. SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。

3. STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。

4. TIDYING:

  • SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。
  • 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。
  • 线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。

5. TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。

参考链接

主要流程

创建线程池 -> 创建 worker 或放入任务队列 -> 线程开始执行任务
源码流程:

execute

JDK线程池源码研究

        // 获取线程池句柄
        int c = ctl.get();
        // 当前线程数量是否小于设置的核心线程池大小
        if (workerCountOf(c) < corePoolSize) {
            // 添加一个 woker
            if (addWorker(command, true))
               // 添加成功后推出
                return;
            // 刷新线程池句柄
            c = ctl.get();
        }
        // 以下是线程池数量达到上限后的处理
        // 如果当前线程池是 RUNNING,且添加任务队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 刷新线程池句柄
            int recheck = ctl.get();
            // 如果当前线程池非 RUNNING状态,则移除任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果是RUNNING状态或移除任务失败,且当前工作线程为0
            else if (workerCountOf(recheck) == 0)
                // 添加一个没有任务的 worker 去执行任务队列中剩余的任务
                addWorker(null, false);
        }
        // 如果线程池非RUNNING或,添加任务队列失败,则新增一个 woker 执行任务
        // 如果增加 worker 失败则抛出异常拒绝任务
        else if (!addWorker(command, false))
            reject(command);

addWorker

JDK线程池源码研究

  • 判断线程状态,增加 woker 数量

           retry:
          for (;;) {
              // 获取线程池句柄
              int c = ctl.get();
              // 当前线程池状态
              int rs = runStateOf(c);
    
              // 1.如果当前线程池状态为 STOP 及以上,返回false
              // 2.如果当前线程状态为 SHUTDOWN,且任务和任务队列都为空,返回false
              if (rs >= SHUTDOWN &&
                  ! (rs == SHUTDOWN &&
                     firstTask == null &&
                     ! workQueue.isEmpty()))
                  return false;
    
              for (;;) {
                  // 获取线程数量
                  int wc = workerCountOf(c);
                  // 如果线程数量大于系统最大值,或大于设置的最大线程数量,返回false
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  // CAS 操作 worker 数量,操作成功后跳出最外层循环,开始执行任务
                  if (compareAndIncrementWorkerCount(c))
                      break retry;
                  // 刷新线程池句柄
                  c = ctl.get();  // Re-read ctl
                  // 如果当前线程池状态有变化,跳出第二层循环,
                  // 即重新 check 线程池、任务、任务队列状态
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
          }
  • 创建 woker,线程执行任务

          // woker 是否启动标记
          boolean workerStarted = false;
          // woker 是否添加进 woker 组标记
          boolean workerAdded = false;
          Worker w = null;
          try {
              // 创建 woker,在 woker 中通过线程工厂会创建线程
              w = new Worker(firstTask);
              // 获取 woker 的线程
              final Thread t = w.thread;
              if (t != null) {
                  // 获取全局锁
                  final ReentrantLock mainLock = this.mainLock;
                  // 阻塞取锁
                  mainLock.lock();
                  try {
                      // 重新获取线程池状态
                      int rs = runStateOf(ctl.get());
                      // 如果线程池是 RUNNING 状态
                      // 或者是 (SHUTDOWN 状态 且任务为 null)-> excute 中删除失败的任务
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {
                          // 如果当前线程已经启动过了,抛出异常
                          if (t.isAlive()) // precheck that t is startable
                              throw new IllegalThreadStateException();
                          // 讲 woker 添加进 woker 组中
                          workers.add(w);
                          // 记录线程峰值
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          // woker添加成功标记
                          workerAdded = true;
                      }
                  } finally {
                      // 释放全局锁
                      mainLock.unlock();
                  }
                  // 如果添加 woker 时没有异常,开始执行任务,标记 woker 已开始工作
                  if (workerAdded) {
                      t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              // 如果 woker 启动失败,执行添加失败的策略 
              if (! workerStarted)
                  // woker 组中移除 woker
                  // 减少 woker 组 woker 总数
                  // 尝试结束线程池
                  addWorkerFailed(w);
          }
          // 返回 woker 是否开始
          return workerStarted;

runWorker

Woker 本身是一个 Runnable 对象,在新建 Thread 对象时,线程工厂会把 Woker 传给 Thread,Thread.start 时,会调用 Woker.run,Woker.run 再调用外部的 runWoker 方法

JDK线程池源码研究

        // 获取当前线程
        Thread wt = Thread.currentThread();
        // 获取当前任务
        Runnable task = w.firstTask;
        // 将 woker 的任务设置为空
        w.firstTask = null;
        // 在创建 worker 时,会将状态标记为 -1 ,执行任务前将状态改为 0
        // -1 时不允许结束这个 woker
        // 防止 worker 刚加入 worker 组还没开始执行任务就被回收了
        w.unlock(); // allow interrupts
       // 线程是否异常退出的标记
        boolean completedAbruptly = true;
        try {
            // 当任务不为空,或任务队列的任务不为空时
            while (task != null || (task = getTask()) != null) {
                // 锁住线程
                w.lock();
                //1. 线程池如果是 STOP 状态,且当前线程不是中断状态,则设置中断状态
                //2. 线程池不是 STOP 状态,刷新当前线程线程的中断状态,刷新后再次获取线程池状态,如果是 STOP 且当前线程不是中断状态,则设置中断状态
                // 第二次获取线程池状态是防止刷新线程中断状态后线程池结束,所以刷新后再 check 一次
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务之前的钩子
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务之后的钩子
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将任务重置为空
                    task = null;
                    // 增加 woker 的完成任务数量
                    w.completedTasks++;
                    // 释放当前 woker 的锁
                    w.unlock();
                    // 如果没有异常抛出则继续 while 循环
                }
            }
            // 异常退出走不到这里
            completedAbruptly = false;
        } finally {
            // 处理 while 循环结束后的工作
            // 如果是异常退出,则新增一个 woker 代替
            // 如果是正常退出,则清理 woker 组,尝试关闭线程池
            processWorkerExit(w, completedAbruptly);
        }

getTask

JDK线程池源码研究

        // 是否获取任务超时的标记
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            // 线程池句柄
            int c = ctl.get();
            // 线程池状态
            int rs = runStateOf(c);

            // 如果当前线程池是 STOP 及以上状态,则减少 woker 数量,返回 null
            // 如果当前线程是 SHUTDOWN 且任务队列为空,则减少 woker 数量,返回 null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            // 当前线程数量
            int wc = workerCountOf(c);

            // 是否允许超时
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 1. 线程总数大于最大线程数且任务队列为空,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行
            // 2. 线程总数小于或等于最大线程数,允许超时且已超过时,且任务队列为空或线程数大于1,减少 woker 数量,减少成功后返回 null,减少失败时回到循环第一行
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 获取任务,如果允许超时,则使用 poll 方法并设置超时时间
               // 否则使用 take 方法,
               // 在 take 方法中,如果任务队列为空会调用 unsafe.park 将线程挂起
               // workQueue.offer 添加任务时,会调用 unsafe.unpark 唤醒一个线程
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果任务不会 null 则返回任务
                if (r != null)
                    return r;
                // 任务为 null 将超时标记为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!