线程池 ThreadPoolExecutor 源码详细分析
2022/2/10 17:12:59
本文主要是介绍线程池 ThreadPoolExecutor 源码详细分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、线程池的作用
一方面当执行大量一步任务的时候线程池能够提供较好的性能,在不使用线程池的时候,每当需要执行异步的时候都是直接 new 一线程进行运行,而线程的创建和销毁都是需要开销的。使用线程池的时候,线程池里面的线程是可复用的,不会每次执行异步任务的时候都重新创建和销毁线程。
另一方面线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等,每个 ThreadPoolExecutor
也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
2、ThreadPoolExecutor 继承体系
- Executor:只定义了一个方法
execute()
,用于执行提交的任务 - ExecutorService:定义了一些线程池管理、任务提交、线程检测的方法
- AbstractExecutorService:提供了
ExecutorService
接口执行方法的默认实现,用于同一处理Callable
任务和Runnable
任务。
下面介绍一下线程池的拒绝策略
当线程池的任务缓存队列已经满了并且线程池中的线程数目达到 maximumPoolSize
的时候,如果还有任务到来就会采取任务拒绝策略,通常有以下四种拒绝策略:
- AbortPolicy:直接抛出一个
RejectedExecutionException
的异常,让你感知到任务被拒绝了,下面你可以根据业务逻辑选择重试或者放弃提交。 - DisCardPolicy:当新任务被提交后直接被丢弃,也不会给你任何的通知,可能造成数据丢失的现象。
- DisCardOldestPolicy:如果线程池没有关闭且没有能力执行,则会丢弃任务队列中的头结点,然后重新提交被拒绝的任务。通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的任务不是最新提交的,而是队列中存活时间最长的,这样就可以腾出资源给新提交的任务,但是同理也存在一定的数据丢失风险。
- CallerRunsPolicy:当有新任务提交的时候,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:
- 提交的新任务不会被丢弃,这样也就不会造成业务损失。
- 由于谁提交任务谁就要负责执行,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时得,在这段期间,提交任务得线程被占用,也就不会提交新的任务,减缓了任务提交得速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
阻塞队列
BlockingQueue
BlockingQueue 在 java.util.concurrent 包下,其他阻塞类都实现自 BlockingQueue 接口,BlockingQueue 提供了线程安全的队列访问方式,当向队列中插入数据的时候,如果队列已经满了,线程则会阻塞等待队列中的元素被取出后再插入;当从队列中取数据的时候,如果队列为空,则线程会阻塞等待队列中有新元素再获取。
LinkedBlockingQueue
LinkedBlockingQueue 是一个由链表实现的线程安全的有界阻塞队列,容量默认值为 Integer.MAX_VALUE,也可以自定义容量,建议指定容量大小,默认大小在添加速度大于删除速度的情况下有造成内存溢出的风险,LinkedBlockingQueue 是先进先出的方式存储元素。
ArrayBlockingQueue
ArrayBlockingQueue 是一个有边界的阻塞队列,它的内部实现是一个数组。它的容量是有限的,我们在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue 也是先进先出的方式存储数据,ArrayBlockingQueue 内部的阻塞队列通过 ReenterLock 和 Condition 条件队列实现的,因此 ArrayBlockingQueue 中的元素存在公平访问和非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用的时候,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口,在创建元素的时候可以指定延迟时间,只有到达了延迟的时间之后,才能获取到该元素。实现了 Delayed 接口必须重写两个方法,getDelay(TimeUnit) 和 compareTo(Delayed)。
PriorityQueue
PriorityQueue 是一个基于优先级堆的无界优先级队列
。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列的时候提供的 Comparator 进行排序,具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。
PriorityQueue 需要注意的点:
- PriorityQueue 是非线程安全的,在多线程情况下可使用
PriorityBlockingQueue
类替代。 - PriorityQueue 不允许插入 null 元素。
LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素的时候,如果队列不为空,则直接取走数据,若是队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队的时候发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为 匹配 方式。
总结
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
五种常用线程池
- newCachedThreadPool:可缓存的线程池
- newCachedThreadPool 用于创建一个可缓存的线程池,之所以叫可缓存线程池,是因为它在创建新线程的时候如果有可重用的线程,则重用它们,否则创建一个新线程并将其添加到线程池中。
- 在线程池的 keepAliveTime 时间超过默认的 60秒后,该线程会被终止并从缓存中移除,因此在没有线程任务运行的时候,newCachedThreadPool 将不会占用系统的线程资源
- newFixedThreadPool:固定大小的线程池
- newFixedThreadPool 可用于创建一个固定的线程数量的线程池
- 如果任务数量大于等于指定线程池中线程的数量,则新提交的任务将在阻塞队列中排队,直到有可用的线程资源。
- newScheduledThreadPool:可做任务调度的线程池
- newScheduledThreadPool 用于创建可定时调度的线程池,可设置在给定延迟时间后执行或者定期执行某个线程任务。
- newSingleThreadPool :单个线程的线程池
- newSingleThreadPool:创建的线程池会确保池中永远有且只有一个可用的线程
- 在该线程停止或者发生异常的时候,newSingleThreadPool 线程池会启动一个新的线程代替该线程继续执行任务
- newWorkStealingPool:足够大小的线程池
- newWorkStealingPool 用于创建持有足够多线程的线程池来达到快速运算的目的。
- 在内部通过使用多个队列来减少各个线程调度产生的竞争
- 足够的线程指的是 JDK 根据当前线程的运行需求向操作系统中申请足够多的线程,以保障线程的快速执行
3、ThreadPoolExecutor 源码分析
3.1、成员属性
// 高三位:表示当前线程池运行状态,除去高三位的低位29位:表示当前线程池中所拥有的线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 表示在ctl 中,低 COUNT_BITS 位是用于存放当前线程数量的位数 // 这里为什么不直接使用29,而是使用 Integer.SIZE - 3 表示? // 小概率情况,防止 Integer在JDK版本变更中所占的字节位数不是4个字节了 private static final int COUNT_BITS = Integer.SIZE - 3; // 29 // 低 COUNT_BITS 位 所能表达的最大数值 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits 在ctl中的高三位所代表的运行状态 // -1 负数在计算机中以补码的形式存在 1110 0000 0000 0000 0000 0000 0000 0000 转换成整数是一个负数 private static final int RUNNING = -1 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000 private static final int STOP = 1 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0110 0000 0000 0000 0000 0000 0000 0000 private static final int TERMINATED = 3 << COUNT_BITS; // 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue // workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列 private final BlockingQueue<Runnable> workQueue; // 线程池的全局锁,增加worker,减少worker 的时候需要持有mainLock,修改线程池运行状态的时候也需要 private final ReentrantLock mainLock = new ReentrantLock(); // 线程池真正存放worker -> thread的地方 private final HashSet<Worker> workers = new HashSet<Worker>(); // 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止 // 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态) // 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock // 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行 // 简单理解,termination.await() 会将线程阻塞在这里 // termination.signAll() 会将阻塞在这里的线程依次唤醒 private final Condition termination = mainLock.newCondition(); // 记录线程池生命周期内线程数最大值 private int largestPoolSize; // 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount private long completedTaskCount; // 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory // 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory? // 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" // 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的 private volatile ThreadFactory threadFactory; // 拒绝策略:juc包中默认提供了四种方式,默认采用 AbortPolicy,直接抛出异常的方式 private volatile RejectedExecutionHandler handler; // 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时 // allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收 private volatile long keepAliveTime; // 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以 private volatile boolean allowCoreThreadTimeOut; // 核心线程数量限制 private volatile int corePoolSize; // 线程池最大线程数量限制 private volatile int maximumPoolSize; // 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
构造方法
// 构造方法 public ThreadPoolExecutor(int corePoolSize, // 核心线程数量限制 int maximumPoolSize, // 最大线程数限制 // 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时 // allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收 long keepAliveTime, TimeUnit unit, // 时间单位 seconds nano.. // 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue // workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列 BlockingQueue<Runnable> workQueue, // 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory // 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory? // 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" // 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的 ThreadFactory threadFactory, // 四种拒绝策略 RejectedExecutionHandler handler) { // 判断参数是否越界 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 工作队列和线程工厂和拒绝策略都不能为空 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3.2、成员预热方法
// Packing and unpacking ctl // 获取当前线程池运行状态 // ~CAPACITY & c = ctl => ~0001 1111 1111 1111 1111 1111 1111 1111 & 1110 0000 0000 0000 0000 0000 0000 0011(假设是RUNNING状态) // ~CAPACITY => 1110 0000 0000 0000 0000 0000 0000 0011 // ctl => 1110 0000 0000 0000 0000 0000 0000 0000 // ~CAPACITY & c => 1110 0000 0000 0000 0000 0000 0000 0000 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取当前线程池线程数量 // c == ctl => 1110 0000 0000 0000 0000 0000 0000 0111 // CAPACITY => 0001 1111 1111 1111 1111 1111 1111 1111 // c & CAPACITY => 0000 0000 0000 0000 0000 0000 0000 0111 private static int workerCountOf(int c) { return c & CAPACITY; } // 用在重置当前线程池 ctl 值的时候会用到 // rs:代表线程池状态 wc:代表当前线程池 worker(线程)数量 // rs: 1110 0000 0000 0000 0000 0000 0000 0000 (假设位RUNNING状态) // wc: 0000 0000 0000 0000 0000 0000 0000 0111 // rs | wc:1110 0000 0000 0000 0000 0000 0000 0111 == 重置为ctl的值 private static int ctlOf(int rs, int wc) { return rs | wc; } // 比较当前线程池ctl所表示的状态,是否小于某个状态s // c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true // 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static boolean runStateLessThan(int c, int s) { return c < s; } // 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态 private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0 private static boolean isRunning(int c) { return c < SHUTDOWN; } // 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } // 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } // 将ctl 的值减-1,这个方法一定成功 private void decrementWorkerCount() { // 这里会一直进行重试,直到成功为止 do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
3.3、Worker 内部类分析
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // Worker 采用了AQS 的独占模式 // 独占模式:两个重要属性 state 和 ExclusiveOwnerThread // state:0的时候,表示未被占用,>0 的时候表示被占用,<0 的时候,表示初始状态,这种情况下不能被抢锁 // ExclusiveOwnerThread:表示独占锁线程 private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ // Worker 内部封装的工作线程 final Thread thread; /** Initial task to run. Possibly null. */ // 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务 // 初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行 Runnable firstTask; /** Per-thread task counter */ // 记录当前所完成的任务数量 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ // firstTask 可以为 null,为null启动后会到 queue中获取 Worker(Runnable firstTask) { // 设置 AQS 独占模式为初始化状态,不能被抢占锁 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ // 当worker 启动的时候,会执行run()方法 public void run() { // ThreadPoolExector -> runWork(Worker w) 这个是核心方法,等后面分析worker启动后逻辑的时候会以这里为切入点 runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // 判断当前worker的独占锁是否被独占 // 0 表示未被占用 // 1 表示已经占用 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试去占用当前 worker 的独占锁 // protected boolean tryAcquire(int unused) { // 使用 CAS 修改 AQS 中的state,期望值为0(0的时候表示未被占用),修改成功表示当前线程修改成功 // 那么设置 ExclusiveOwnerThread 为当前线程 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 尝试释放当前 worker 的独占锁 // 外部不会直接调用该方法,这个方法是 AQS的,外部调用 unlock的时候,unlock -> AQS.release -> tryRelease protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 加锁,加锁失败的时候,会阻塞当前线程,直到获取到锁位置 public void lock() { acquire(1); } // 尝试去加锁,如果当前锁是未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false public boolean tryLock() { return tryAcquire(1); } // 一般情况下,咱们调用unlock 要保证当前线程是持有锁的 // 特殊情况,当 worker 的 state == -1的时候,调用 unlock 表示初始化state,设置 state == 0 // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0 public void unlock() { release(1); } // 就是返回当前 worker 的lock 是否被占用 public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
3.4、execute(Runnable command) 方法分析
// 线程池提交方法 // Runnable command:可以是普通的 Runnable 实现类,也可以是 FutureTask public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取ctl 最新值 ctl:高三位 表示线程池状态,低位表示当前线程池线程数量 int c = ctl.get(); // workerCountOf(c):获取当前线程池数量 // 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新线程 if (workerCountOf(c) < corePoolSize) { // addWorker 即为创建线程的过程,会创建worker对象,并且将command作为fistTask // core == true:表示采用核心线程数量限制,false:表示采用 maximumPoolSize if (addWorker(command, true)) // 创建成功后直接返回,addWorker 方法里面会启动新创建的worker,将firstTask执行 return; // 执行到这条语句,说明addWorker 一定是失败了 // 有几种可能? // 1.存在并发现象,execute 方法是有可能有多个线程同时调用的,当workerCountOf(c)<corePoolSize成立后 // 其他线程可能也成立了,并且向线程池中创建了worker,这个时候线程池中线程数量已经达到了核心线程数,所以当前线程失败了 // 2.当前线程池状态发生改变了,RUNNING SHUTDOWN STOP TIDYING TERMINATION // 当线程池状态是非 RUNNING 状态的时候,addWorker(firstWorker != null,true | false) 一定会失败 // SHUTDOWN 状态下,也有可能创建成功,前提 firstTask == null 而且当前queue不为空,特殊情况,在addWorker方法中有说明 c = ctl.get(); } // 执行到这里有几种情况? // 1.当前线程数量已经达到了 corePoolSize // 2.addWorker 失败,并发导致 // 条件成立:说明当前线程池处于RUNNING 状态,则尝试将 task 放入到workqueue中 if (isRunning(c) && workQueue.offer(command)) { // 进入到内部的前提条件:当前线程池中的线程数量大于等于corePoolSize,并且当前线程池处于RUNNING状态,而且当前线程将command放入到workqueue成功 // 再次获取ctl的值 int recheck = ctl.get(); // 条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改了,比如:shutdown() shutdownNow() // 这种情况成立的话,说明线程池状态被外部线程给修改了,需要把刚刚提交的任务给删除掉 // 条件二:remove(command) :有可能成功,也有可能失败 // 成功:提交之后,线程池中的线程还未消费(处理) // 失败:提交之后,在shutdown() shutdownNow() 之前,就被线程池中的线程给处理了 if (! isRunning(recheck) && remove(command)) // 提交之后,线程状态为非RUNNING了且移除任务队列成功,走个拒绝策略 reject(command); // 有几种情况会走到这里? // 1.当前线程池是RUNNING 状态 // 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败 // 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬,任务没线程去跑了 // 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 执行到这里,有几种情况? // 1.workqueue.offer(command)失败,说明当前队列满了 // 2.当前线程是非RUNNING 状态 // 1.offer失败,需要做什么?说明当前queue满了,这个时候,如果当前线程数量尚未达到maximumPoolSize得话,尝试创建新得worker,直接执行command // 假设线程数量达到 maximumPoolSize 得话,这里会也会失败,也会走拒绝策略 // 2.线程池状态为非RUNNING 状态,这个时候因为 command != null addWorker 一定是返回false else if (!addWorker(command, false)) reject(command); }
3.4、addWorker(Runnable firstTask, boolean core) 方法分析(重点)
// 返回值总结: // true:表示worker 创建成功,且线程启动成功 // false:表示创建失败 // 1.线程池状态 rs > SHUTDOWN (STOP TIDYING TERMINATION) // 2.rs == shutdown 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN 且队列未空,但是当前任务的firstTask不为null // 3. 当前线程池已经达到指定指标(corePoolSize / maximumPoolSize) // 4.threadFactory 实现类创建的线程为null // Runnable firstTask:可以为null,表示启动worker之后,worker 自动到queue中去获取任务,如果不是null,则worker优先执行 firstTask // boolean core:表示采用的线程池的线程数限制,true:采用核心线程数限制,false:采用maximumPoolSize线程数限制 private boolean addWorker(Runnable firstTask, boolean core) { // 自旋操作:判断当前线程池状态是否允许创建线程 retry: for (;;) { // 获取当前ctl值保存到c中 int c = ctl.get(); // 获取当前线程池运行状态保存到rs中 int rs = runStateOf(c); // Check if queue empty only if necessary. // 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态 // 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() // 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空 // 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 什么情况下会返回false? // 线程池状态 rs >= SHUTDOWN // rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null return false; // 上面这些代码,就是判断当前线程池状态是否允许添加worker // 内部自旋操作:获取创建线程令牌的过程 for (;;) { // 获取当前线程池中的线程数量,保存到wc中 int wc = workerCountOf(c); // 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字 // 条件二:wc >= (core ? corePoolSize : maximumPoolSize) // core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制 // core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了 return false; // 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌 // 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突 // 可能发生过什么冲突? // 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败 // 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态 // 线程池状态改变后,CAS 操作也会失败 if (compareAndIncrementWorkerCount(c)) // 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作 break retry; // CAS 失败,没有成功的申请到令牌 // 获取最新的 ctl 值 c = ctl.get(); // Re-read ctl // 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化 if (runStateOf(c) != rs) // 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // CAS 成功,跳出外部自旋操作来到了这里 // workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动 boolean workerStarted = false; // workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加 boolean workerAdded = false; // w:表示后面创建worker 的一个引用 Worker w = null; try { // 创建worker w = new Worker(firstTask); // 将新创建的worker 节点的线程赋值给t final Thread t = w.thread; // 为什么这里还要做 t != null 这个判断? // 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现 // 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null if (t != null) { // 将全局锁的引用保存到mainLock变量中 final ReentrantLock mainLock = this.mainLock; // 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁 mainLock.lock(); // 从这里加锁之后,其他线程是无法修改线程池状态的 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取最新的线程池运行状态保存到rs中 int rs = runStateOf(ctl.get()); // 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态 // 条件二: 前置条件:当前线程池状态不是RUNNING 状态 // (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null // 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // t.isAlive() 当前线程start 后,线程isAlive 会返回true // 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将咱们创建的worker 添加到线程池中 workers.add(w); // 获取最新当前线程池的线程数量 int s = workers.size(); // 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量 if (s > largestPoolSize) largestPoolSize = s; // 表示线程已经加入到线程池中了 workerAdded = true; } } finally { // 释放线程池全局锁 mainLock.unlock(); } // 条件成立:说明当前添加worker成功 // 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败 if (workerAdded) { // 成功后则将创建的worker启动 t.start(); // 启动标记设置为true workerStarted = true; } } } finally { // 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作 // 1.释放令牌 // 2.将当前worker 清理出workers集合 if (! workerStarted) addWorkerFailed(w); } // 返回新创建的线程是否启动 return workerStarted; }
addWorkerFailed(Worker w) 方法
// addWorker添加线程到线程池中失败或者线程启动失败的后续工作,两个操作 // 1.释放令牌 // 2.将当前worker 清理出workers集合 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 持有线程池全局锁,因为操作的是线程池相关的东西 mainLock.lock(); try { // 条件成立:需要将worker 在workers 中清理除去 if (w != null) workers.remove(w); // 线程池计数恢复-1,前面+1获取令牌了,这里因为失败,所以要-1,相当于归还令牌 decrementWorkerCount(); // 回头讲,shutdown shutdownNow再说 tryTerminate(); } finally { // 释放线程池全局锁 mainLock.unlock(); } }
3.5、runWorker(Worker w) 方法分析
// w:就是启动worker final void runWorker(Worker w) { // wt == w.thread Thread wt = Thread.currentThread(); // 将初始执行的task赋值给task Runnable task = w.firstTask; // 清空当前w.firstTask的引用 w.firstTask = null; // 这里为什么先调用unlock? 就是为了初始化worker state = 0 和 ExclusiveOwnerThread = null // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0 w.unlock(); // allow interrupts // 是否是突然退出?如果为true,表示发生异常了,当前线程突然退出的,回头需要做一些处理 false:表示正常退出的 boolean completedAbruptly = true; try { // 条件一:task != null 指的就是firstTask 是否为null 如果是null,直接执行循环体里面 // 条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask是会阻塞线程的方法 // getTask()如果返回null,说明当前线程需要执行退出逻辑 while (task != null || (task = getTask()) != null) { // worker 设置独占锁为当前线程 // 为什么要设置独占锁呢?就是怕shutdown()的时候会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池状态大于等于STOP,线程池目前处于STOP/TIDYING/TERMINATION,此时线程一定要给它一个中断信号 // 条件一成立:(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted() // 上面条件如果成立,说明当前线程池状态是大于等于STOP且当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断 // 假设:(runStateAtLeast(ctl.get(), STOP) == false // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) // Thread.interrupted() 获取当前中断状态,且设置中断位为false,连续调用两次interrupted()方法,第二次一定是返回false // runStateAtLeast(ctl.get(), STOP):大概率这里还是false // 其实它在强制刷新当前线程的中断标记为false,因为有可能上一次执行task的时候,业务代码里面将线程的中断标记位设置为了true且没有做处理 // 所以这里一定要强制刷新一下,不会再影响到后面的task了 // 假如:Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true // 这种情况有可能发生嘛? // 有可能,因为外部线程再当前线程第一次(runStateAtLeast(ctl.get(), STOP) == false后,有机会调用shutdown shutdownNow方法将线程池状态修改 // 这个时候也会将当前线程的中断标记位再次设置为中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,留给子类实现的 beforeExecute(wt, task); // 表示异常情况,如果thrown不为空,表示task运行过程中发生了异常,向上层抛出了异常 Throwable thrown = null; try { // task可能是firstTask,也可能是普通的runnable接口实现类 // 如果前面是通过submit()提交的runnable/callable会被封装成FutureTask对象 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法,留给子类实现的 afterExecute(task, thrown); } } finally { // 将局部变量task设置为 null task = null; // 更新worker完成任务数量 w.completedTasks++; // worker 处理完一个任务后会释放独占锁,然后再次到queue中获取任务 // 1.正常情况下会再次回到getTask()那里获取任务 while(getTask()) // 2.task.run()内部抛出异常了 w.unlock(); } } // while // 什么情况下会来到这里? // getTask()返回null的时候,说明当前线程应该执行退出逻辑了 completedAbruptly = false; } finally { // task.run() 内部抛出异常的时候,当上面的finally执行完以后跳到这里 // 正常提出 completedAbruptly = false 异常退出:completedAbruptly = true // 这个方法以后再说 processWorkerExit(w, completedAbruptly); } }
3.6、getTask() 方法分析(重点)
// 什么情况下会返回null? // 1.rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了 // 2.说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null // 3.线程池中的线程数量超过最大限制的时候,会有一部分线程返回为null // 4.线程池中的线程数超过corePoolSize的时候,会有一部分线程超时后返回null // 到当前的任务队列queue中去获取任务 private Runnable getTask() { // 表示当前线程获取任务是否超时 默认false true表示已经超时 boolean timedOut = false; // Did the last poll() time out? // 自旋 for (;;) { // 获取最新ctl的值保存到c中 int c = ctl.get(); // 获取当前线程池的运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING 状态,可能是SHUTDOWN/STOP/TIDYING/TERMINATION// // 条件二:rs >= STOP || workQueue.isEmpty() // 2.1:rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了 // 2.2:前置条件:状态是SHUTDOWN,workqueue.isEmpty() 条件成立:说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null // 返回null,runworker()方法就会将返回null的线程执行退出线程池的逻辑 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 使用CAS + 死循环的方式将ctl的值-1 decrementWorkerCount(); return null; } // 执行到这里,有几种情况: // 1.线程池是RUNNING 状态 // 2.线程池是SHUTDOWN 状态,但是队列queue还未为空,此时可以创建线程(添加worker) // 获取线程池中的线程数量 int wc = workerCountOf(c); // timed == true:表示当前这个线程获取task的时候是支持超时机制的,使用queue.poll(时间单位,数量) // 当获取task超时的情况下,下一次自旋就可能返回null了 // timed == false:表示当前这个线程获取task的时候是不支持超时机制的,当前线程会使用 queue.take() // 情况一:allowCoreThreadTimeOut == true 表示核心线程数量内的空闲线程可以被回收,所有线程都是使用queue.poll()超时机制这种方式获取task // 情况二:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程 // wc > corePoolSize:条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程都使用poll,超时的方式去获取任务 // 这样就可能会有一部分线程获取不到任务,返回null,然后runworker会执行线程退出逻辑 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 条件一:(wc > maximumPoolSize || (timed && timedOut) // 1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize() 方法,可能外部线程将线程池最大线程数设置为比初始化时的要小 // 1.2:timed && timedOut 条件成立:前置条件,当前线程使用poll()的方式从队列queue中获取worker执行,上一次循环的时候,使用poll的方式获取任务的时候,超时了 // 条件一为true:表示线程可以被回收,达到回收的标准,当确实需要回收的时候再回收 // 条件二:wc > 1 || workQueue.isEmpty() // 2.1:wc > 1:条件成立,说明当前线程池中还有其他线程,当前线程不需要从队列中获取任务可以直接回收 // 2.2:workQueue.isEmpty():前置条件,条件成立:wc == 1 ,说明当前任务队列已经空了,最后一个线程,也可以放心的退出 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 使用 CAS 机制,将ctl的值减去1,减一成功的线程返回null // CAS 失败?为什么会CAS 失败? // 1.其他线程先你一步推出了 // 2.线程池状态发生变化了 if (compareAndDecrementWorkerCount(c)) return null; // 再次自旋的时候,timed有可能就是false了,因为当前线程CAS失败,很有可能是因为其他线程成功退出导致的,再次查询的时候 // 检查发现,当前线程就可能属于不需要回收范围内了 continue; } // 获取任务的逻辑 try { // poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内, // 队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // take 方法:获取队列中的第一个元素,如果被阻塞,则等待 workQueue.take(); // 条件成立:返回任务 if (r != null) return r; // 说明当前线程超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
3.7、processWorkerExit(Worker w, boolean completedAbruptly) 方法分析
// 线程退出的逻辑 // w:线程执行的worker要获取任务 // completedAbruptly:线程是否发生异常 true:表示线程发生了异常 false:表示线程没有发生异常 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 条件成立:代表当前w这个worker 是发生异常退出的 task任务执行过程中向上抛出了异常 // 异常退出的时候,ctl的线程数量并没有-1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 获取线程池的全局锁引用 final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // 将当前worker完成的task数量,汇总到线程池的completedTaskCount completedTaskCount += w.completedTasks; // 将worker 从任务中移除 workers.remove(w); } finally { mainLock.unlock(); } // 回头再说 tryTerminate(); // 获取最新的ctl值 int c = ctl.get(); // 条件成立:说明当前线程池状态为RUNNING 或者 SHUTDOWN 状态 if (runStateLessThan(c, STOP)) { // 条件成立:说明当前线程是正常退出的 if (!completedAbruptly) { // min:表示线程池最低持有的线程数量 // allowCoreThreadTimeOut == true:说明核心线程数内的线程,也会超时被回收 min = 0 // allowCoreThreadTimeOut == false:说明核心线程数内的线程,不会超时被回收 min = corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 前提状态:线程池状态为RUNNING SHUTDOWN // 条件一: min == 0成立 // 条件二:! workQueue.isEmpty() 说明任务队列中还有任务,最起码留一个线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 条件成立:线程池中还拥有足够的线程 // 考虑一个问题:workerCountOf(c) >= min => (0 >= 0) // 有可能! // 什么情况下,当线程池中的核心线程数是可以被回收的,会出现这种情况,这种情况下,当前线程池中的线程数会变为0 // 下次再提交任务的时候,会再次创建线程 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.当前线程再执行task的时候发生异常,这里一定要创建一个新的worker顶上去 // 2.! workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程,当前状态为 RUNNING | SHUTDOWN // 3.线程数量小于corePoolSize值,此时会创建线程,维护线程池中的线程数量在corePollSize左右 addWorker(null, false); } }
3.8、shutdown() 方法分析
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 获取线程池的全局锁 mainLock.lock(); try { // 判断是否有权限 checkShutdownAccess(); // 设置线程池的状态为 SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); // 空方法,子类可以扩展 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { // 释放线程池全局锁 mainLock.unlock(); } // 后面再说 tryTerminate(); } // 中断空闲线程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } // 通过自旋操作设置线程池的状态为SHUTDWON private void advanceRunState(int targetState) { // 自旋 for (;;) { int c = ctl.get(); // 条件一成立:假设targetState == SHUTDOWN,说明当前线程池状态是 >= SHUTDOWN // 条件一不成立:假设targetState == SHUTDOWN,说明当前线程池状态是 RUNNING if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
3.9、tryTerminate() 方法分析
final void tryTerminate() { // 自旋操作 for (;;) { // 获取最新ctl的值 int c = ctl.get(); // 条件一:isRunning(c) 成立:直接返回就行了,线程池很正常 // 条件二:runStateAtLeast(c, TIDYING) 说明已经有其他线程在执行 TIDYING 转到 TERMIATION 状态了,当前线程直接回去 // 条件三:runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty() // SHUTDOWN 特殊情况,如果是这种情况,直接回去,得等队列中得任务处理完后再转化状态 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 什么情况会执行到这里? // 1.线程池状态为 STOP 得时候 // 2.线程池状态为 SHUTDWON并且队列已经空了 // 条件成立:当前线程池中得线程数量 > 0 if (workerCountOf(c) != 0) { // Eligible to terminate // 中断一个空闲线程 // 空闲线程在哪空闲呢?queue.take() 或者 queue.poll() // 1.唤醒后的线程会在getTask方法返回null // 2.执行退出逻辑得时候,会再次调用 tryTerminate 方法,唤醒下一个空闲线程 // 3.因为线程池状态是(线程池状态为 STOP 得时候 | 线程池状态为 SHUTDWON并且队列已经空了) 最终调用addWorker得时候会失败得 // 最终空闲线程都会在这里退出,非空闲线程当执行完当前task得时候,也会调用tryTerminate方法,有可能会走到这里 interruptIdleWorkers(ONLY_ONE); return; } // 执行到这里得线程是谁? // workerCountOf(c) == 0得时候,会来到这里 // 最后一个退出得线程,咱们知道,在(线程池状态 == STOP | 线程池状态为 SHUTDWON并且队列已经空了) // 线程唤醒后,都会执行退出逻辑,退出过程中会先将 workerCount 计数 -1 => ctl中得计数-1 // 调用 tryTerminate方法之前,已经减过了,所以 0 得时候,表示这是最后一个退出得线程了 final ReentrantLock mainLock = this.mainLock; // 获取全局锁 mainLock.lock(); try { // 设置线程池状态为 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 调用钩子方法 terminated(); } finally { // 设置线程池状态为 TERMINATED 状态 ctl.set(ctlOf(TERMINATED, 0)); // 唤醒调用 awaitTermination() 方法得线程 termination.signalAll(); } return; } } finally { // 释放全局锁 mainLock.unlock(); } // else retry on failed CAS } }
3.10、shutdownNow() 方法分析
public List<Runnable> shutdownNow() { // 返回值引用 List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; // 获取线程池的全局锁 mainLock.lock(); try { // 校验权限 checkShutdownAccess(); // 设置线程池状态为STOP advanceRunState(STOP); // 中断线程池中的所有线程 interruptWorkers(); // 导出未处理的task tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 返回当前任务队列中未处理的任务 return tasks; }
至此,线程池 ThreadPoolExecutor 类源码就告一段落了,下面会开始更新 AQS 部分的内容,如果有错误,请指正!
这篇关于线程池 ThreadPoolExecutor 源码详细分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-01巧用 TiCDC Syncpoint 构建银行实时交易和准实时计算一体化架构
- 2024-05-01银行核心背后的落地工程体系丨Oracle - TiDB 数据迁移详解
- 2024-04-26高性能表格工具VTable总体构成-icode9专业技术文章分享
- 2024-04-16软路由代理问题, tg 无法代理问题-icode9专业技术文章分享
- 2024-04-16程序猿用什么锅-icode9专业技术文章分享
- 2024-04-16自建 NAS 的方案-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数, 加上remote_src: yes 配置-icode9专业技术文章分享
- 2024-04-14ansible 检测远程主机的8080端口,如果关闭,则echo 进程已关闭-icode9专业技术文章分享
- 2024-04-14result 成功怎么写-icode9专业技术文章分享