什么是线程池

顾名思义,管理线程的池子,有什么好处呢?

  • 管理线程,避免增加线程和销毁线程的资源损耗。
  • 提高效率,创建好预设的线程数量放入池子,随用随取。
  • 扩展了功能,提供定时,定期,单线程,并发控制等功能。

线程池创建

创建的线程方式Executors静态方法,实例ThreadPoolExecutor类,这里不推荐使用Executors方式,原因可以看阿里的Java开发手册
我们看下ThreadPoolExecutor方法的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

几个核心参数:

  • corePoolSize: 核心线程的数量
  • maximumPoolSize: 最大线程数。看代码maximumPoolSize不能小于corePoolSize
  • keepAliveTime: 线程最大空闲时间。
  • unit: 时间单位。
  • workQueue: 线程等待队列。
  • threadFactory: 线程创建工厂。
  • handler: 拒绝策略

我们来分析一下这些参数对应背后的逻辑,一张图解释逻辑

  • 判断corePoolSize是否已满,未满执行线程任务。
  • 如果corePoolSize已满,判断workQueue是否已满,未满加入队列等待。
  • 如果workQueue已满,判断maximumPoolSize是否超出,未超出创建非核心线程。
  • 如果超出maximumPoolSize设置,就会执行handler。

还有有个ScheduledThreadPoolExecutor类来支持周期性任务的调度,继承ThreadPoolExecutor类。

  • 将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响。
  • ScheduledFutureTask实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有两个重要的方法: compareTogetDelaycompareTo方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高。getDelay方法用于返回距离下次任务执行时间的时间间隔。
  • ScheduledThreadPoolExecutor定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序。
  • ScheduledFutureTask继承自FutureTask,可以通过返回Future对象来获取执行的结果。

常用的线程池

通过Executors创建线程池有3种,基于ThreadPoolExecutor类封装:

1. Executors.newCachedThreadPool

创建可缓存的线程池。

特点:

  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60秒

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

工作流程:

使用场景

用于并发执行大量短期的小任务。

2. Executors.newSingleThreadExecutor

创建单线程的线程池。

特点

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

工作流程:

使用场景

适用于串行执行任务的场景,一个任务一个任务地执行。

3. Executors.newFixedThreadPool

创建固定长度的线程池。

特点:

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为无界队列LinkedBlockingQueue

工作流程:

使用场景

FixedThreadPool适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

  • FixedThreadPool和SingleThreadExecutor: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常。
  • CachedThreadPool: 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常。

通过Executors创建OOM问题解决方案:
可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常。

线程池的参数如何定义?

  • CPU密集型: 线程池的大小推荐为CPU数量 + 1,CPU数量可以根据Runtime.availableProcessors方法获取
  • IO密集型: CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间)
  • 混合型: 将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整。
  • 阻塞队列: 推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生, 尽量不要Integer.MAX_VALUE。
  • 拒绝策略: 默认采用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常【因为是运行时异常,不强制catch】,这种处理方式不够优雅。处理拒绝策略有以下几种比较推荐:
  1. 在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略。
  2. 使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低。
  3. 自定义拒绝策略,只需要实现RejectedExecutionHandler接口。
  4. 如果任务不是特别重要,使用DiscardPolicyDiscardOldestPolicy拒绝策略将任务丢弃。

任务执行

拒绝策略

  • AbortPolicy: 抛出一个异常,默认的。
  • DiscardPolicy: 直接丢弃任务。
  • DiscardOldestPolicy: 丢弃队列里最老的任务,将当前这个任务继续提交给线程池。
  • CallerRunsPolicy: 交给线程池调用所在的线程进行处理。

工作队列

ArrayBlockingQueue

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

LinkedBlockingQueue

LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene,newFixedThreadPool线程池使用了这个队列。

DelayQueue

DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列。

SynchronousQueue

SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。