当前位置 博文首页 > 努力的小雨:源码剖析ThreadPoolExecutor线程池及阻塞队列

    努力的小雨:源码剖析ThreadPoolExecutor线程池及阻塞队列

    作者:努力的小雨 时间:2021-02-05 16:24

      本文章对ThreadPoolExecutor线程池的底层源码进行分析,线程池如何起到了线程复用、又是如何进行维护我们的线程任务的呢?我们直接进入正题:

      首先我们看一下ThreadPoolExecutor类的源码

     1 public ThreadPoolExecutor(int corePoolSize,
     2                               int maximumPoolSize,
     3                               long keepAliveTime,
     4                               TimeUnit unit,
     5                               BlockingQueue<Runnable> workQueue, 
     6                               ThreadFactory threadFactory,
     7                               RejectedExecutionHandler handler) { //拒绝策略
     8         if (corePoolSize < 0 ||
     9             maximumPoolSize <= 0 ||
    10             maximumPoolSize < corePoolSize ||
    11             keepAliveTime < 0)
    12             throw new IllegalArgumentException();
    13         if (workQueue == null || threadFactory == null || handler == null)
    14             throw new NullPointerException();
    15         this.acc = System.getSecurityManager() == null ?
    16                 null :
    17                 AccessController.getContext();
    18         //核心线程
    19         this.corePoolSize = corePoolSize;
    20         //最大线程数
    21         this.maximumPoolSize = maximumPoolSize;
    22         //阻塞队列,即今天主题
    23         this.workQueue = workQueue;
    24         //超时时间
    25         this.keepAliveTime = unit.toNanos(keepAliveTime);
    26         this.threadFactory = threadFactory;
    27         //拒绝策略
    28         this.handler = handler;
    29     }

      这是我们线程池实例化的时候的参数,其实最大的实用性来说,就是核心线程与最大线程数的设定,这个完全靠个人经验,并没有一个真正意义上的公式可以适用所有的业务场景,这里博主为大家找了一篇关于设定线程数的文章:

      https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

      我们的线程池初始化好后,我们自己会调用excute方法来让线程池运行我们的线程任务,那我们就先来看看这个方法的实现:

     1 public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         /*
     5          * 第一步:工作线程是否小于核心线程数量,如果是添加work中,worker其实也是一个线程,只不过它内部操作的是我们的上传的任务
     6          * 第二步:如果大于核心线程数量,添加到worker队列中,每一个不同的队列offer的实现方法也是不一样的,今天我们主要探讨这个
     7          * 第三步:阻塞队列被塞满了,需要创建新的非核心线程数量worker线程去处理我们的任务,创建worker线程失败了会触发拒绝策略,默认抛异常
     8          */
     9         int c = ctl.get();
    10         if (workerCountOf(c) < corePoolSize) {
    11             if (addWorker(command, true))
    12                 return;
    13             c = ctl.get();
    14         }
    15         if (isRunning(c) && workQueue.offer(command)) {
    16             int recheck = ctl.get();
    17             if (! isRunning(recheck) && remove(command))
    18                 reject(command);
    19             else if (workerCountOf(recheck) == 0)
    20                 addWorker(null, false);
    21         }
    22         else if (!addWorker(command, false))
    23             reject(command);
    24     }
    25     

      我们看到当任务调用的时候,会执行addworker,那么worker是个什么东西呢?我们来看看它的构造实例:我们看一下worker类,就发现其实worker也是一个线程

     1 private final class Worker
     2         extends AbstractQueuedSynchronizer
     3         implements Runnable
     4     {
     5     
     6     ......
     7     
     8     Worker(Runnable firstTask) {
     9             setState(-1); // inhibit interrupts until runWorker
    10             this.firstTask = firstTask;
    11             this.thread = getThreadFactory().newThread(this);
    12         }
    13 
    14         /** 覆盖执行run方法
    15           */
    16         public void run() {
    17             runWorker(this);
    18         }
    19     ......
    20     
    21     }

      这次我们来看一下addworker是怎么操作的:

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2         retry:
     3         for (;;) {
     4             int c = ctl.get();
     5             int rs = runStateOf(c);
     6 
     7             // Check if queue empty only if necessary.
     8             if (rs >= SHUTDOWN &&
     9                 ! (rs == SHUTDOWN &&
    10                    firstTask == null &&
    11                    ! workQueue.isEmpty()))
    12                 return false;
    13 
    14             for (;;) {
    15                 int wc = workerCountOf(c);
    16                 if (wc >= CAPACITY ||
    17                     //不允许创建大于最大核心线程数的任务
    18                     wc >= (core ? corePoolSize : maximumPoolSize))
    19                     return false;
    20                 if (compareAndIncrementWorkerCount(c))
    21                     break retry;
    22                 c = ctl.get();  // Re-read ctl
    23                 if (runStateOf(c) != rs)
    24                     continue retry;
    25                 // else CAS failed due to workerCount change; retry inner loop
    26             }
    27         }
    28 
    29         boolean workerStarted = false;
    30         boolean workerAdded = false;
    31         Worker w = null;
    32         try {
    33             //主要的创建worker过程是在这里
    34             w = new Worker(firstTask);
    35             final Thread t = w.thread;
    36             if (t != null) {
    37                 final ReentrantLock mainLock = this.mainLock;
    38                 mainLock.lock();
    39                 try {
    40                     // Recheck while holding lock.
    41                     // Back out on ThreadFactory failure or if
    42                     // shut down before lock acquired.
    43                     int rs = runStateOf(ctl.get());
    44 
    45                     if (rs < SHUTDOWN ||
    46                         (rs == SHUTDOWN && firstTask == null)) {
    47                         if (t.isAlive()) // precheck that t is startable
    48                             throw new IllegalThreadStateException();
    49                         workers.add(w);
    50                         int s = workers.size();
    51                         if (s > largestPoolSize)
    52                             largestPoolSize = s;
    53                         workerAdded = true;
    54                     }
    55                 } finally {
    56                     mainLock.unlock();
    57                 }
    58                 if (workerAdded) {
    59                     //此处调用的是worker线程的start方法,并没有直接调用我们的 任务
    60                     //上面我们看worker的run方法了,里面调用的 是runWorker,那我们看看runWorker方法就可以了
    61                     t.start();
    62                     workerStarted = true;
    63                 }
    64             }
    65         } finally {
    66             if (! workerStarted)
    67                 addWorkerFailed(w);
    68         }
    69         return workerStarted;
    70     }
    71     

      到这里添加完毕后,我们在看看它是是如何执行我们的线程的,来看看runworker方法实现:

     1 final void runWorker(Worker w) {
     2         Thread wt = Thread.currentThread();
     3         Runnable task = w.firstTask;
     4         w.firstTask = null;
     5         w.unlock(); // allow interrupts
     6         boolean completedAbruptly = true;
     7         try {
     8             //这里体现的是线程的复用,复用的是worker线程,每处理一个线程都会getTask()从队列中取一个任务进行处理
     9             while (task != null || (task = getTask()) != null) {
    10                 w.lock();
    11                 // If pool is stopping, ensure thread is interrupted;
    12                 // if not, ensure thread is not interrupted.  This
    13                 // requires a recheck in second case to deal with
    14                 // shutdownNow race while clearing interrupt
    15                 if ((runStateAtLeast(ctl.get(), STOP) ||
    16                      (Thread.interrupted() &&
    17                       runStateAtLeast(ctl.get(), STOP))) &&
    18                     !wt.isInterrupted())
    19                     wt.interrupt();
    20                 try {
    21                     beforeExecute(wt, task);
    22                     Throwable thrown = null;
    23                     try {
    24                         //直接调用我们任务的run方法,我们任务虽然是继承了runable,但是并没有调用start方法
    25                         //其实我们的线程放入线程池中,并不是让我们的线程运行,仅仅是定义了一个方法体,
    26                         //真正运行的是被线程池管理的worker线程
    27                         task.run();
    28                     } catch (RuntimeException x) {
    29                         thrown = x; throw x;