当前位置 博文首页 > RtxTitanV的博客:Java并发总结之线程池

    RtxTitanV的博客:Java并发总结之线程池

    作者:[db:作者] 时间:2021-07-07 12:30

    本文主要对线程池进行一个总结,文中的源码解析都是基于JDK1.8。

    一、线程池概述

    1.什么是线程池

    线程池是指管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了所有等待执行的任务。工作线程(Worker Thread)的任务就是从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

    2.线程池的优点

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行,从而提高了响应性。
    • 提高线程的可管理性。使用线程池可以进行统一的管理,调优和监控,适当的调整线程池的大小,可以创建足够的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

    3.线程池体系

    与线程池相关的主要的类和接口的实现继承体系如下:
    1

    • Executor:线程池的顶级接口,但并不是一个线程池,而是任务执行(Executor)框架的核心接口,它将任务的提交和执行分离开来。
    • ExecutorService:真正的线程池接口。
    • ScheduledExecutorService:主要用来解决任务需要重复执行的问题。
    • ThreadPoolExecutor:线程池的核心实现类,也是Executor框架最核心的类,用来执行被提交的任务。
    • ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor类并实现ScheduledExecutorService接口,用于周期性的执行任务。

    4.线程池的创建

    创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor类中提供的四个构造方法可以创建自定义的线程池,推荐使用这种方式创建线程池。除了通过ThreadPoolExecutor构造方法实现外,还可以通过Executor框架的工具类Executors来实现,可以创建newFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecutornewScheduledThreadPool这四种常见的线程池。

    《阿里巴巴Java开发手册》中强制线程池不允许使用Executors创建,而是通过ThreadPoolExecutor构造函数的方式,这样的处理方式让创建者更加明确线程池的运行规则,规避资源耗尽的风险。
    Executors创建线程池对象的弊端如下:

    • FixedThreadPool和SingleThreadExecutor:堆积的请求处理队列可能会耗费非常大的内存,从而导致OOM。
    • CachedThreadPool和ScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。

    5.线程池的工作原理

    当一个任务提交给线程池时,任务的执行过程如下:
    2

    1. 线程池首先判断核心线程池是否已满,没有满则新创建一个工作线程执行任务,满了则判断工作队列是否已满。
    2. 如果工作队列没有满,则将任务加入队列,如果工作队列已满,则判断线程池是否达到最大线程数。
    3. 如果线程池没有达到最大线程数,则新创建一个工作线程执行任务,如果达到最大线程数,则执行饱和策略。

    6.线程池的关闭

    可以通过调用线程池的shutdown()shutdownNow()方法来关闭线程池。它们的原理都是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,而无法响应中断的任务可能无法终止。shutdown()shutdownNow()的区别

    • shutdown()只是关闭了提交通道,用submit()是无效的,但是已经提交的任务得执行完毕。
    • shutdownNow()能立即停止线程池,会终止当前正在运行的任务,并停止等待执行的任务并返回正在等待执行的任务列表。

    只要调用了这两个方法的任意一个,isShutdown方法都会返回true,当所有的任务都成功关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。isShutdown()isTerminated()的区别

    • isShutDown当调用shutdown()方法后返回为true。
    • isTerminated当调用shutdown()方法后并且所有提交的任务都完成后返回为true。

    通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

    7.线程池的合理配置

    想要合理的配置线程池,可以从任务的性质、优先级、执行时长、依赖性几个角度考虑。任务的性质分为两种:

    • CPU密集型任务(N+1):这种任务消耗的主要是CPU资源,可以将线程数设置为N+1,N为CPU 核心数,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间。
    • I/O密集型任务(2N):这种任务应用起来,系统会用大部分的时间来处理I/O交互,而线程在处理I/O的时间段内不会占用CPU来处理,这时就可以将CPU交出给其它线程使用。因此在I/O密集型任务的应用中,可以多配置一些线程,具体的计算方法是2N。

    而优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行。注意,如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时长不同的任务也可以使用优先级队列,让执行时间短的任务先执行。具有依赖性的任务可能需要等待其他资源,等待时间越长,CPU空闲时间就越长,这种情况应该设置较大线程数,充分利用CPU。

    工作队列建议使用有界队列,有界队列能增加系统的稳定性和预警能力。使用无界队列时,队列中堆积的大量任务可能会导致OOM

    8.线程池的监控

    可以通过线程池提供的参数可以对线程池进行监控,可以使用ThreadPoolExecutor中的以下方法进行监控:

    • getTaskCount():线程池需要执行的任务数量。
    • getCompletedTaskCount():已完成的任务数量。
    • getLargestPoolSize():线程池里曾经创建过的最大线程数量。
    • getPoolSize():线程池的线程数量。
    • getActiveCount():活动的线程数量。

    ThreadPoolExecutor是可扩展的,通过扩展线程池进行监控。它提供了几个可以在子类中重写的方法:beforelExecuteafterExecuteterminated,这些方法可以用于扩展ThreadPoolExecutor的行为。

    在执行任务的线程中将调用beforeExecuteafterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

    在线程池完成关闭操作时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

    二、ThreadPoolExecutor

    创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor类中提供的四个构造方法,参数最多的这个构造方法是最通用的构造方法,其他三个构造方法都是在这个构造方法的基础上产生的,比较简单。

    1.通用构造方法

    /**
     * @param corePoolSize 线程池的核心线程数量
     * @param maximumPoolSize 线程池的最大线程数量
     * @param keepAliveTime 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
     * @param unit keepAliveTime的时间单位
     * @param workQueue 任务队列(工作队列),用来储存等待执行任务的队列
     * @param threadFactory 线程工厂
     * @param handler 饱和策略
     * @throws 如果出现以下之一 抛出IllegalArgumentException :<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws 如果 {@code workQueue} 或 {@code threadFactory} 
     * 或 {@code handler} 为 null 抛出NullPointerException 
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    构造方法的参数如下:

    • corePoolSize:线程池的核心线程数量(核心线程池的大小,线程池的基本大小),当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数。 只有在工作队列满了的情况下才会创建超出这个数量的线程。如果调用了prestartCoreThread()或者prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。
    • maximumPoolSize:线程池的最大线程数量,如果工作队列已满并且当前线程池线程个数小于最大线程数,就会创建新的线程来执行任务。注意,如果工作队列使用无界队列,这个参数就没有什么效果。
    • keepAliveTime:空闲线程存活时间,如果某个线程的空闲时间超过了存活时间,将被标记为可回收,并且当线程池的当前线程数超过了核心线程数时,这个线程将被终止。
    • unitkeepAliveTime参数的时间单位。
    • workQueue:用于保存等待执行的任务工作队列(阻塞队列,任务队列)。
    • threadFactory:线程工厂,可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
    • handler:饱和策略,当工作队列已满并且线程数已达到最大线程数,说明当前线程池已处于饱和状态,如果再提交任务,则会触发饱和策略。

    2.ThreadPoolExecutor再定制

    在调用完ThreadPoolExecutor的构造器后,仍然可以通过设置方法(Setter)来修改大多数传递给它的构造器的参数,例如线程池的核心线程数、最大线程数、存活时间、线程工厂以及饱和策略(拒绝执行处理器,Rejected Execution Handler)。如果Executor是通过Executors中的某个工厂方法创建的(newSingleThreadExecutor除外),那么可以将结果的类型转换为ThreadPoolExecutor以访问设置方法。

    Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor

    3.基础属性

    ThreadPoolExecutor中基础属性和相关方法解释如下:

    /**
     * 主池控制状态ctl是包含两个概念字段的原子整数
     *   workerCount:有效线程数量(工作线程数量)
     *   runState:运行状态
     * 为了将workerCount和runState用一个int表示, 我们将workerCount限制为 (2^29)-1而不是(2^31)-1
     * 即用int的低29位用来表示workerCount,用int的高3位用来表示runState
     * workerCount是已被允许启动但不允许停止的工作线程数,该值可能与活动线程的实际数量暂时不同
     *
     * runState提供主要的生命周期控制,并具有以下值:
     *   RUNNING:接受新任务并处理排队任务
     *   SHUTDOWN:不接受新任务,但处理排队任务
     *   STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
     *   TIDYING:所有任务已经终止,workerCount为零,线程转换到状态TIDYING将运行terminate()钩子方法;
     *   TERMINATED:terminated()已经完成,该方法执行完毕代表线程池已经完全终止
     *
     * 运行状态之间并不是随意转换的,大多数状态都只能由固定的状态转换而来,转换关系:
     * RUNNING -> SHUTDOWN:在调用shutdown()时,可能隐含在finalize()
     * (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
     * SHUTDOWN -> TIDYING:当队列和线程池都为空时
     * STOP -> TIDYING:当线程池为空时
     * TIDYING -> TERMINATED:当terminate()方法完成时
     *
     * 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回
     * 检测从SHUTDOWN到TIDYING的转换并不像想像的那样简单,因为在SHUTDOWN状态期间,队列在非空之后可能变为空
     * 反之亦然,但是只有在看到它为空之后才能看到workerCount为0(有时需要重新检查)
     */
    // 初始化时的有效线程数为0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 高3位表示运行状态,此值用于运行状态向左移动的位数,即29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 有效线程容量,低29位表示有效的线程数,00011111 11111111 11111111 11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 高3位表示线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS; //11100000 00000000 00000000 00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //00000000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS; //00100000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS; //01000000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS; //01100000 00000000 00000000 00000000
    
    /**
     * 获取运行状态,c位ctl值,~CAPACITY高3位为1,低29位为0,运算结果为ctl的高3位,即运行状态
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    /**
     * 获取有效线程数,c位ctl值,CAPACITY高3位为0,低29位为1,运算结果为ctl的低29位,即有效线程数
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    /**
     * 获取ctl值,高3位的运行状态和低29位的有效线程数进行或运算
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    /*
     * 判断状态c是否小于s
     */
    private static boolean runStateLessThan(int c, int s) { return c < s; }
    /*
     * 判断状态c是否大于等于s
     */
    private static boolean runStateAtLeast(int c, int s) { return c >= s; }
    /*
     * 判断状态c是否为RUNNING
     */
    private static boolean isRunning(int c) { return c < SHUTDOWN; }
    
    /**
     * 使用CAS增加一个有效的线程
     */
    private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
    
    /**
     * 使用CAS减少一个有效的线程
     */
    private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
    
    /**
     * 减少一个有效的线程
     */
    private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
    
    // 工作队列
    private final BlockingQueue<Runnable> workQueue;
    
    // 锁
    private final ReentrantLock mainLock = new ReentrantLock();
    
    
    // 包含线程池中的所有工作线程的Set,只有在持有mainLock的情况下才能访问
    private final HashSet<Worker> workers = new HashSet<Worker>();
    
    
    // 等待条件以支持awaitTermination
    private final Condition termination = mainLock.newCondition();
    
    // 跟踪线程池达到的最大大小,仅在mainLock下访问
    private int largestPoolSize;
    
    
    // 完成的任务总数,仅在终止工作线程时更新,仅在mainLock下访问。
    private long completedTaskCount;
    
    // 线程工厂,用于创建线程
    private volatile ThreadFactory threadFactory;
    	
    // 饱和策略