线程池学习

ThreadPoolExecutor 使用。

Executor

ExecutorService

ThreadPoolExecutor

ExecutorService 接口定义了任务执行器完整的生命周期。

Runnable Callable

Callable 和 Runnable 类似,但是 Callable 有返回接口,返回接口存放在 Future 中。

FutureTask == Future + Runnable

JDK 提供的线程池

ThreadPoolExecutor

ForkJoinPool

ThreadPoolExecutor 7 个重要参数

ThreadPoolExecutor 几个核心方法

ThreadPoolExecutor源码分析
1、常用变量的解释
// 1. ctl,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl=new AtomicInteger(ctlOf(RUNNING,0));
// 2. COUNT_BITS,Integer.SIZE 为32,所以 COUNT_BITS 为29
private static final int COUNT_BITS=Integer.SIZE-3;
// 3. CAPACITY,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY=(1<<COUNT_BITS)-1;

// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING=-1<<COUNT_BITS;
private static final int SHUTDOWN=0<<COUNT_BITS;
private static final int STOP=1<<COUNT_BITS;
private static final int TIDYING=2<<COUNT_BITS;
private static final int TERMINATED=3<<COUNT_BITS;

// Packing and unpacking ctl
// 5. runStateOf(),获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c){return c&~CAPACITY;}
// 6. workerCountOf(),获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c){return c&CAPACITY;}
// 7. ctlOf(),根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs,int wc){return rs|wc;}

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
// 8. runStateLessThan(),线程池状态小于xx
private static boolean runStateLessThan(int c,int s){
        return c<s;
}
// 9. runStateAtLeast(),线程池状态大于等于xx
private static boolean runStateAtLeast(int c,int s){
        return c>=s;
        }
2、构造方法
public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler){
        // 基本类型参数校验
        if(corePoolSize< 0||
        maximumPoolSize<=0||
        maximumPoolSize<corePoolSize ||
        keepAliveTime< 0)
        throw new IllegalArgumentException();
        // 空指针校验
        if(workQueue==null||threadFactory==null||handler==null)
        throw new NullPointerException();
        this.corePoolSize=corePoolSize;
        this.maximumPoolSize=maximumPoolSize;
        this.workQueue=workQueue;
        // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
        this.keepAliveTime=unit.toNanos(keepAliveTime);
        this.threadFactory=threadFactory;
        this.handler=handler;
        }
3、提交执行task的过程
public void execute(Runnable command){
        if(command==null)
        throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c=ctl.get();
        // worker数量比核心线程数小,直接创建worker执行任务
        if(workerCountOf(c)<corePoolSize){
        if(addWorker(command,true))
        return;
        c=ctl.get();
        }
        // worker数量超过核心线程数,任务直接进入队列
        if(isRunning(c)&&workQueue.offer(command)){
        int recheck=ctl.get();
        // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
        // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
        if(!isRunning(recheck)&&remove(command))
        reject(command);
        // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
        else if(workerCountOf(recheck)==0)
        addWorker(null,false);
        }
        // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
        // 这儿有3点需要注意:
        // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
        // 2. addWorker第2个参数表示是否创建核心线程
        // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
        else if(!addWorker(command,false))
        reject(command);
        }
4、addworker源码解析
private boolean addWorker(Runnable firstTask,boolean core){
        retry:
        // 外层自旋
        for(;;){
        int c=ctl.get();
        int rs=runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if(rs>=SHUTDOWN&&
        !(rs==SHUTDOWN&&
        firstTask==null&&
        !workQueue.isEmpty()))
        return false;

        // 内层自旋
        for(;;){
        int wc=workerCountOf(c);
        // worker数量超过容量,直接返回false
        if(wc>=CAPACITY||
        wc>=(core?corePoolSize:maximumPoolSize))
        return false;
        // 使用CAS的方式增加worker数量。
        // 若增加成功,则直接跳出外层循环进入到第二部分
        if(compareAndIncrementWorkerCount(c))
        break retry;
        c=ctl.get();  // Re-read ctl
        // 线程池状态发生变化,对外层循环进行自旋
        if(runStateOf(c)!=rs)
        continue retry;
        // 其他情况,直接内层循环进行自旋即可
        // else CAS failed due to workerCount change; retry inner loop
        }
        }
        boolean workerStarted=false;
        boolean workerAdded=false;
        Worker w=null;
        try{
        w=new Worker(firstTask);
final Thread t=w.thread;
        if(t!=null){
final ReentrantLock mainLock=this.mainLock;
        // worker的添加必须是串行的,因此需要加锁
        mainLock.lock();
        try{
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        // 这儿需要重新检查线程池状态
        int rs=runStateOf(ctl.get());

        if(rs<SHUTDOWN ||
        (rs==SHUTDOWN&&firstTask==null)){
        // worker已经调用过了start()方法,则不再创建worker
        if(t.isAlive()) // precheck that t is startable
        throw new IllegalThreadStateException();
        // worker创建并添加到workers成功
        workers.add(w);
        // 更新`largestPoolSize`变量
        int s=workers.size();
        if(s>largestPoolSize)
        largestPoolSize=s;
        workerAdded=true;
        }
        }finally{
        mainLock.unlock();
        }
        // 启动worker线程
        if(workerAdded){
        t.start();
        workerStarted=true;
        }
        }
        }finally{
        // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
        if(!workerStarted)
        addWorkerFailed(w);
        }
        return workerStarted;
        }
5、线程池worker任务单元
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // 省略代码...
}
6、核心线程执行逻辑 - runWorker
final void runWorker(Worker w){
        Thread wt=Thread.currentThread();
        Runnable task=w.firstTask;
        w.firstTask=null;
        // 调用unlock()是为了让外部可以中断
        w.unlock(); // allow interrupts
        // 这个变量用于判断是否进入过自旋(while循环)
        boolean completedAbruptly=true;
        try{
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while(task!=null||(task=getTask())!=null){
        // 这儿对worker进行加锁,是为了达到下面的目的
        // 1. 降低锁范围,提升性能
        // 2. 保证每个worker执行的任务是串行的
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted.  This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        // 如果线程池正在停止,则对当前线程进行中断操作
        if((runStateAtLeast(ctl.get(),STOP)||
        (Thread.interrupted()&&
        runStateAtLeast(ctl.get(),STOP)))&&
        !wt.isInterrupted())
        wt.interrupt();
        // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
        // 这两个方法在当前类里面为空实现。
        try{
        beforeExecute(wt,task);
        Throwable thrown=null;
        try{
        task.run();
        }catch(RuntimeException x){
        thrown=x;throw x;
        }catch(Error x){
        thrown=x;throw x;
        }catch(Throwable x){
        thrown=x;throw new Error(x);
        }finally{
        afterExecute(task,thrown);
        }
        }finally{
        // 帮助gc
        task=null;
        // 已完成任务数加一 
        w.completedTasks++;
        w.unlock();
        }
        }
        completedAbruptly=false;
        }finally{
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w,completedAbruptly);
        }
        }

Executors - 线程池工厂

Executors 提供了几种线程池,这几种线程池都是基于 ThreadPoolExecutor 的封装。

并发 VS 并行

并发指的是提交,并行指的是执行。

并行是并发的子集。并行只有在多 CPU 的情况下才能实现。