当前位置 博文首页 > 努力的小雨:源码剖析ThreadPoolExecutor线程池及阻塞队列
本文章对ThreadPoolExecutor线程池的底层源码进行分析,线程池如何起到了线程复用、又是如何进行维护我们的线程任务的呢?我们直接进入正题:
首先我们看一下ThreadPoolExecutor类的源码
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { //拒绝策略 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); 18 //核心线程 19 this.corePoolSize = corePoolSize; 20 //最大线程数 21 this.maximumPoolSize = maximumPoolSize; 22 //阻塞队列,即今天主题 23 this.workQueue = workQueue; 24 //超时时间 25 this.keepAliveTime = unit.toNanos(keepAliveTime); 26 this.threadFactory = threadFactory; 27 //拒绝策略 28 this.handler = handler; 29 }
这是我们线程池实例化的时候的参数,其实最大的实用性来说,就是核心线程与最大线程数的设定,这个完全靠个人经验,并没有一个真正意义上的公式可以适用所有的业务场景,这里博主为大家找了一篇关于设定线程数的文章:
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
我们的线程池初始化好后,我们自己会调用excute方法来让线程池运行我们的线程任务,那我们就先来看看这个方法的实现:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * 第一步:工作线程是否小于核心线程数量,如果是添加work中,worker其实也是一个线程,只不过它内部操作的是我们的上传的任务 6 * 第二步:如果大于核心线程数量,添加到worker队列中,每一个不同的队列offer的实现方法也是不一样的,今天我们主要探讨这个 7 * 第三步:阻塞队列被塞满了,需要创建新的非核心线程数量worker线程去处理我们的任务,创建worker线程失败了会触发拒绝策略,默认抛异常 8 */ 9 int c = ctl.get(); 10 if (workerCountOf(c) < corePoolSize) { 11 if (addWorker(command, true)) 12 return; 13 c = ctl.get(); 14 } 15 if (isRunning(c) && workQueue.offer(command)) { 16 int recheck = ctl.get(); 17 if (! isRunning(recheck) && remove(command)) 18 reject(command); 19 else if (workerCountOf(recheck) == 0) 20 addWorker(null, false); 21 } 22 else if (!addWorker(command, false)) 23 reject(command); 24 } 25
我们看到当任务调用的时候,会执行addworker,那么worker是个什么东西呢?我们来看看它的构造实例:我们看一下worker类,就发现其实worker也是一个线程
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 6 ...... 7 8 Worker(Runnable firstTask) { 9 setState(-1); // inhibit interrupts until runWorker 10 this.firstTask = firstTask; 11 this.thread = getThreadFactory().newThread(this); 12 } 13 14 /** 覆盖执行run方法 15 */ 16 public void run() { 17 runWorker(this); 18 } 19 ...... 20 21 }
这次我们来看一下addworker是怎么操作的:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 if (rs >= SHUTDOWN && 9 ! (rs == SHUTDOWN && 10 firstTask == null && 11 ! workQueue.isEmpty())) 12 return false; 13 14 for (;;) { 15 int wc = workerCountOf(c); 16 if (wc >= CAPACITY || 17 //不允许创建大于最大核心线程数的任务 18 wc >= (core ? corePoolSize : maximumPoolSize)) 19 return false; 20 if (compareAndIncrementWorkerCount(c)) 21 break retry; 22 c = ctl.get(); // Re-read ctl 23 if (runStateOf(c) != rs) 24 continue retry; 25 // else CAS failed due to workerCount change; retry inner loop 26 } 27 } 28 29 boolean workerStarted = false; 30 boolean workerAdded = false; 31 Worker w = null; 32 try { 33 //主要的创建worker过程是在这里 34 w = new Worker(firstTask); 35 final Thread t = w.thread; 36 if (t != null) { 37 final ReentrantLock mainLock = this.mainLock; 38 mainLock.lock(); 39 try { 40 // Recheck while holding lock. 41 // Back out on ThreadFactory failure or if 42 // shut down before lock acquired. 43 int rs = runStateOf(ctl.get()); 44 45 if (rs < SHUTDOWN || 46 (rs == SHUTDOWN && firstTask == null)) { 47 if (t.isAlive()) // precheck that t is startable 48 throw new IllegalThreadStateException(); 49 workers.add(w); 50 int s = workers.size(); 51 if (s > largestPoolSize) 52 largestPoolSize = s; 53 workerAdded = true; 54 } 55 } finally { 56 mainLock.unlock(); 57 } 58 if (workerAdded) { 59 //此处调用的是worker线程的start方法,并没有直接调用我们的 任务 60 //上面我们看worker的run方法了,里面调用的 是runWorker,那我们看看runWorker方法就可以了 61 t.start(); 62 workerStarted = true; 63 } 64 } 65 } finally { 66 if (! workerStarted) 67 addWorkerFailed(w); 68 } 69 return workerStarted; 70 } 71
到这里添加完毕后,我们在看看它是是如何执行我们的线程的,来看看runworker方法实现:
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 try { 8 //这里体现的是线程的复用,复用的是worker线程,每处理一个线程都会getTask()从队列中取一个任务进行处理 9 while (task != null || (task = getTask()) != null) { 10 w.lock(); 11 // If pool is stopping, ensure thread is interrupted; 12 // if not, ensure thread is not interrupted. This 13 // requires a recheck in second case to deal with 14 // shutdownNow race while clearing interrupt 15 if ((runStateAtLeast(ctl.get(), STOP) || 16 (Thread.interrupted() && 17 runStateAtLeast(ctl.get(), STOP))) && 18 !wt.isInterrupted()) 19 wt.interrupt(); 20 try { 21 beforeExecute(wt, task); 22 Throwable thrown = null; 23 try { 24 //直接调用我们任务的run方法,我们任务虽然是继承了runable,但是并没有调用start方法 25 //其实我们的线程放入线程池中,并不是让我们的线程运行,仅仅是定义了一个方法体, 26 //真正运行的是被线程池管理的worker线程 27 task.run(); 28 } catch (RuntimeException x) { 29 thrown = x; throw x;