【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 并发编程

    • 可重入独占锁ReentrantLock
    • 共享锁Semaphore
    • 阻塞队列ArrayBlockingQueue
    • 阻塞队列LinkedBlockingQueue
    • HashMap详解
    • ThreadPoolExecutor线程池
      • 1 线程
      • 2 线程池
      • 3 Executor框架
      • 4 源码分析
  • 框架

  • 源码阅读
  • 并发编程
luoxiaofeng
2022-08-02
目录

ThreadPoolExecutor线程池

# 1 线程

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。

线程存在以下几种生命状态:

  • NEW 新建
  • RUNNABLE 运行
  • BLOCKED 阻塞
  • WAITING 等待
  • TIMED_WAITING 超时等待
  • TERMINATED 终结
image-20220803155112647

线程的实现方式:

Thread Runnable Callable

// 实现Runnable接口的类将被Thread执行,表示一个基本的任务
public interface Runnable {
    // run方法就是它所有的内容,就是实际执行的任务
    public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
public interface Callable<V> {
    // 相对于run方法的带有返回值的call方法
    V call() throws Exception;
}
1
2
3
4
5
6
7
8
9
10

# 2 线程池

线程池其实就是线程缓存,具有以下优势:

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池原理

image-20220804170634259

# 3 Executor框架

image-20220803103454193

Executor接口只有一个方法

public interface Executor {
    // 执行Ruannable类型的任务
    void execute(Runnable command);
}
1
2
3
4

ExecutorService接口重点方法

public interface ExecutorService extends Executor {
    // 在完成已提交的任务后封闭办事,不再接管新任务
    void shutdown();

    // 尝试停止所有正在执行的任务
    List<Runnable> shutdownNow();

    // 是否ExecutorService已被关闭
    boolean isShutdown();

    // 是否所有任务都执行完毕
    boolean isTerminated();

    // 提交Callable任务,并返回代表此任务的Future对象
    <T> Future<T> submit(Callable<T> task);

    // 提交Runnable任务,并返回代表此任务的Future对象,Future的get方法将在成功完成后返回给定的结果
    // result - 要返回的结果
    <T> Future<T> submit(Runnable task, T result);

    // 提交Runnable任务,并返回代表此任务的Future对象
    Future<?> submit(Runnable task);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

AbstractExecutorService抽象类实现了ExecutorService的部分方法

image-20220803105414269

ThreadPoolExecutor重点属性及方法

public class ThreadPoolExecutor extends AbstractExecutorService {

  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  private static final int COUNT_BITS = Integer.SIZE - 3;
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  //包含池中所有工作线程的集合。仅在持有 mainLock 时访问。
  private final HashSet<Worker> workers = new HashSet<Worker>();

  // 高3位的111
  // 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
  // 线程池的初始状态是RUNNING,即线程池一创建,就处于RUNNING状态,并且线程池中的任务数为0
  private static final int RUNNING    = -1 << COUNT_BITS;

  // 高3位的000
  // 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务
  // 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
  private static final int SHUTDOWN   =  0 << COUNT_BITS;

  // 高3位的001
  // 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且中断正在处理的任务
  // 调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN)-> STOP
  private static final int STOP       =  1 << COUNT_BITS;

  // 高3位的010
  // 当所有的任务已终止,ctl记录的任务数量为0,状态就会变成TIDYING。此时会执行钩子函数terminated()
  // terminated()在ThreadPoolExecutor中是空实现,可通过重写terminated()来实现用户自己的逻辑
  // 线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN -> TIDYING
  // 线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
  private static final int TIDYING    =  2 << COUNT_BITS;

  // 高3位的011
  // 线程池彻底终止,就变成TERMINATED状态
  // 线程池处于TIDING状态时,执行完terminated()后,就会由TIDYING -> TERMINATED
  private static final int TERMINATED =  3 << COUNT_BITS; 

  private static int runStateOf(int c)     { return c & ~CAPACITY; } // 获取线程池的运行状态
  private static int workerCountOf(int c)  { return c & CAPACITY; } // 获取活动线程的数量
  private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取运行状态和活动线程数

  public long getTaskCount() // 线程池已执行与未执行的任务总数
  public long getCompletedTaskCount() // 已完成的任务数
  public int getPoolSize() // 线程池当前的线程数
  public int getActiveCount() // 线程池中正在执行任务的线程数量

  // 线程池创建
  public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                            int maximumPoolSize, // 最大线程数
  // 线程池维护线程所允许的空闲时间。
  // 当线程池中的线程数量大于corePoolSize的时候,
  // 如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是等待,直到等待时间超过keepAliveTime
                            long keepAliveTime,
                            // keepAliveTime的时间单位
                            TimeUnit unit, 
                            // 阻塞队列,保存等待执行的任务,任务必须实现Runable接口
                            BlockingQueue<Runnable> workQueue, 
  // 线程工厂,用来创建新线程。默认使用Executors.defaultThreadFactory() 
  // 使用默认工厂创建线程时,线程具有相同的优先级,并且是非守护线程,同时也设置了线程名称
                            ThreadFactory threadFactory,
  // 拒绝策略,阻塞队列满了,且没有空闲的工作线程时,如果继续提交任务,则执行拒绝策略
                            RejectedExecutionHandler handler)

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)。

使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

线程池状态切换:

image-20220803143535769

线程池提供了4种拒绝策略

1.AbortPolicy:直接抛出异常,默认策略;

2.CallerRunsPolicy:用调用者所在的线程来执行任务;

3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4.DiscardPolicy:直接丢弃任务;

可以根据应用场景实现 RejectedExecutionHandler 接口,自定义拒绝策略

# 4 源码分析

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果活动线程的数量小于核心线程数量
        if (workerCountOf(c) < corePoolSize) {
            //如果添加worker(核心线程)成功,则执行结束。
            if (addWorker(command, true))
                return;
            //添加失败则重新获取ctl(运行状态和活动线程数)
            c = ctl.get();
        }
        //如果线程池在RUNNING状态 并且 队列添加任务成功
        if (isRunning(c) && workQueue.offer(command)) {
            //重新获取ctl(运行状态和活动线程数)
            int recheck = ctl.get();
            //如果线程池不是RUNNING状态 并且 阻塞任务移除失败(任务不存在队列)
            if (! isRunning(recheck) && remove(command))
                //执行拒绝策略
                reject(command);
            //如果活动线程的数量等于0
            else if (workerCountOf(recheck) == 0)
                //只添加线程(任务已经在之前添加进队列)
                addWorker(null, false);
        }
        //否则如果添加worker(最大线程)失败
        else if (!addWorker(command, false))
            //执行拒绝策略
            reject(command);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //线程池运行状态
            int rs = runStateOf(c);
            //如果线程池运行状态不是RUNNING状态(>=SHUTDOWN)
            //并且 非(SHUTDOWN状态下阻塞队列不为空,且入参firstTask为空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                //入参firstTask不为空时,则代表线程池运行状态只要不是RUNNING,都会返回false
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果工作线程达到上限 
                //或者(达到核心线程数(如果入参core==true)或 达到最大线程数(如果core==flase))
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    //返回false
                    return false;
                    //否则,尝试工作线程数加1
                if (compareAndIncrementWorkerCount(c))
                    //增加成功,则跳出外循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                //线程池运行状态有发生变更则重新进入外循环
                if (runStateOf(c) != rs)
                    continue retry;
                //否则继续内循环
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //线程池运行状态是RUNNING 或者 (是SHUTDOWN状态 且 firstTask为null)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //新建线程如果已经是alive状态,抛异常
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //workers集合添加新建worker
                        workers.add(w); 
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //记录线程池最大线程数量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动worker中的线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //添加worker失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { 
    // ... ...
    Worker(Runnable firstTask) {
      setState(-1);
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
      runWorker(this);
    }
}
1
2
3
4
5
6
7
8
9
10
11
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //从阻塞队列取出任务,取出的task为空时跳出循环
        //离开该循环,runWorker执行完,即该线程的run()方法也执行完,线程销毁
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //(线程池运行状态是非RUNNING 或者 线程是中断的(后复位)并且线程池运行状态是非RUNNING)
            // 且 当前线程不是中断的
            // 即:如果线程池正在停止,那么要保证当前线程是中断状态。
            if ((runStateAtLeast(ctl.get(), STOP) 
                    || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                && !wt.isInterrupted())
                //中断线程
                wt.interrupt();
            try {
                //空实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //线程运行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //空实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //如果线程池的状态是非RUNNING 且 (线程池正在stop 或者 阻塞队列为空)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //workerCount - 1
            decrementWorkerCount();
            //返回null
            return null;
        }

        int wc = workerCountOf(c);

        //timed = 核心线程允许超时 或 工作线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //(工作线程数大于最大线程数 或者 超时) 且 (工作线程数超过1 或者 阻塞队列为空)
        if ((wc > maximumPoolSize || (timed && timedOut)) 
                            && (wc > 1 || workQueue.isEmpty())) {
            //工作线程数减1 
            if (compareAndDecrementWorkerCount(c))
                //返回空
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //超时,继续下一轮循环,在下一轮循环中返回空
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
HashMap详解
Spring

← HashMap详解 Spring→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式