线程池源码解析 3.excute() 方法
创始人
2024-01-28 10:59:48
0

线程池源码解析—excute()方法

execute()

  • execute 方法是线程池的核心方法,所有的方法,包括包装的 FutureTask,都是调用这个方法。

大致流程

这里只是总结了一遍大致的流程,一些细节问题见下面的流程图或者参考源码。

  • 当提交任务时,首先判断当前线程池内的线程数是否达到了核心线程,没有达到核心线程数就开线程去执行任务,如果达到了核心线程数,就尝试将任务加入阻塞队列中。
  • 如果说队列也满了,就尝试继续开线程执行任务,如果此时线程池中的存活线程已经等于了 maximumPoolSize,那么直接走拒绝策略。没有到达最大线程,则开启线程执行任务。

流程图

execute

源码解析

   /** command可以是普通的Runnable实现类,也可以是FutureTask(本质也是一个Runnable)*/   public void execute(Runnable command) {// command不能为nullif (command == null)throw new NullPointerException(); /* * 获取内部的ctl的值赋值给c* 高3位表示线程池状态,低29位表示当前线程池的线程数量*/int c = ctl.get();/** workerCountOf(c)就是获取ctl的低29位,即当前线程池中存活的线程数量* 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker(线程),对应线程池中多了一个新的线程*/if (workerCountOf(c) < corePoolSize) {/** addWorker(Runnable firstTask, boolean core) 为创建worker并执行任务的过程,会创建worker对象,并且将command作为firstTask* @param firstTask 表示当前的任务* @param core 表示此次创建的线程是否算到核心线程数的头上*        core == true 表示采用核心线程数量限制  false表示采用 maximumPoolSize*/if (addWorker(command, true))// 创建成功后,直接返回。addWorker方法内部会启动新创建的worker,执行firstTaskreturn;/** 执行到这条语句,说明addWorker()一定失败了...* 有几种可能? *   1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize条件成立时,其他线程可能也成立了,*     并且向线程池中创建了worker,如果此时线程池中的线程数已经到达了上限等,那么就会失败。*   2.当前线程池状态发生了改变*     2.1 如果当前线程池状态是非RUNNING,addWorker(firstTask != null, true|false)一定会失败*     2.2 SHUTDOWN状态下,也有可能创建成功,前提是firstTask == null 并且当前队列不为空(特殊情况)*/ // 重新获取一下ctl的值c = ctl.get();}/** 执行到这里有几种情况?*   1.当前线程数量已经达到了corePoolSize*   2.addWorker()失败*//** 条件成立:表示当前线程池处于running状态,则尝试将任务放到阻塞队列中*/if (isRunning(c) && workQueue.offer(command)) {// 执行到这里,说明当前任务已经入队// 再次获取一下ctl的值int recheck = ctl.get();/** 根据ctl的值 判断当前线程池处于非RUNNING状态,则尝试从队列中移除任务* 条件1:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()*        这种情况 需要把刚刚提交的任务删除掉。* 条件2:remove(command) 移除当前任务,有可能成功,也有可能失败*        成功:提交之后,线程池中的线程还未消费(处理)*        失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理了。*/if (!isRunning(recheck) && remove(command))// 任务移除成功,走拒绝策略reject(command); /** 有几种情况会到这里?*   1.当前线程池是running状态(这个概率最大)*   2.线程池状态是非running状态 但是remove提交的任务失败** 这里是一个担保机制,担保线程池是running状态,* 但是如果线程池中存活的线程是0的话,就会很尴尬,这里就调用addWorker()开一个线程去执行任务* 保证线程池在running状态下,最起码得有一个线程在工作。*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/** 执行到这里有几种情况?*   1.当前线程池是非running状态*   2.任务入队失败* * 1.如果线程池处于非running状态,调用addWorker()一定会失败,走拒绝策略* 2.如果说队列满了,这个时候如果当前线程数量没有达到maximumPoolSize,会创建新的worker直接执行任务(救急线程)*   如果达到了最大线程数,addWorker()就会失败,走拒绝策略*/ else if (!addWorker(command, false))// 走拒绝策略reject(command);}

addWorker()

总体流程分析

先判断当前线程池状态是否允许继续添加任务,允许添加的话,CAS操作ctl的值尝试+1,CAS 成功然后传入任务通过 new Worker() 的方式构造一个 Worker,然后加锁尝试将 worker 对象放入线程池,如果加入线程池成功,释放锁后就调用 start() 执行任务。如果任务最终没有被执行,就需要执行后续的清理逻辑。

源码解析

   /** @param firstTask可以为null,表示启动worker之后,worker自动从队列中获取任务,如果不是null,worker优先执行firstTask(内部任务)* @param core 采用的线程数限制,如果为true,采用核心线程数限制,false采用maximumPoolSize线程数限制 (传入的参数一般都是false)* @return boolean 表示任务是否已经启动* 返回值总结:true表示创建worker成功且线程启动成功,加入到set集合中*            false: ① 线程池状态大于SHUTDOWN时,一定会失败*                    ② 当前状态 = SHUTDOWN 但是队列中已经没有任务了 或 当前状态是SHUTDOWN且队列未空,但是firstTask不为null*                    ③ 当前线程池已经达到了指定指标(core或max)*                    ④ ThreadFactory创建的线程是null*/private boolean addWorker(Runnable firstTask, boolean core) {   // 自旋 判断当前线程池状态是否允许创建线程retry: for (;;) {// 获取当前的ctl值保存到c中int c = ctl.get();// rs:当前线程池的运行状态int rs = runStateOf(c);/** 这个判断主要是为了判断当前线程池是SHUTDOWN状态,但是队列里还有任务尚未处理完,这个时候是允许添加worker(线程)的,* 但是不允许再次提交task*/if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null  &&! workQueue.isEmpty()))return false;// 上面的代码,就是判断当前线程池的状态是否允许继续添加线程// 内部自旋 获取创建线程令牌的过程for (;;) {// 获取当前线程池中的线程数量int wc = workerCountOf(c);/** 条件1:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字* 条件2:根据传来的core参数判断当前线程数是否大于等于核心线程数或者是最大线程数,大于等于的话直接return false*/if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))// 执行到这里,说明当前无法添加线程了,已经达到指定限制了return false;/** 尝试使用CAS方式更新ctl,将线程数量+1,成功,相当于申请到了一块令牌* 成功:说明更新成功,结束外层自旋* 失败:说明有竞争,其他线程已经修改了ctl的值 或者外部线程可能调用了* shutdown或者shutdownNow(),导致线程池状态发生变化,CAS也会失败*/ if (compareAndIncrementWorkerCount(c))// 走到这,一定是CAS成功,申请到令牌了。结束外层自旋,继续向下执行外部代码(创建Worker)break retry;// CAS失败,获取最新的ctl值c = ctl.get();  // 判断当前线程池状态是否发生过变化(如果外部在这之前调用过shutdown/shutdownNow会导致状态变化),如果变化了,继续自旋if (runStateOf(c) != rs)// 状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态 和 是否允许创建线程continue retry;}}// 表示创建的worker是否已经启动boolean workerStarted = false;// 表示创建的worker是否添加到了workers(HashSet)中boolean workerAdded = false;// w表示后面创建worker的一个引用Worker w = null;try {// 创建Worker,执行完后,线程“应该”已经准备好了w = new Worker(firstTask); // 将新创建的worker节点的线程 赋值给tfinal Thread t = w.thread;/** 为什么要做 t != null这个判断?* 为了防止ThreadFactory实现类有bug,因为ThreadFactory是一个接口*/if (t != null) {// 将全局锁的引用保存到mainLock中final ReentrantLock mainLock = this.mainLock;/** 尝试加锁,可能会阻塞,直到获取成功为止,同一时刻操纵线程池内部的相关操作,都必须持锁*/mainLock.lock();  // 只要走到了这里,说明当前线程已经加锁成功,其他线程是无法修改当前线程池状态的try {// 获取线程池运行状态保存到rs中int rs = runStateOf(ctl.get());/** 条件1:判断当前线程池的状态是否正常(RUNNING)* 条件2:判断当前线程池为SHUTDOWN状态并且firstTask为null,其实就是判断是否是SHUTDOWN状态下的特殊情况(队列里还有任务)*/if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 这里防止程序员自定义ThreadFactory实现类时,创建线程返回给外部之前,将线程start了..if (t.isAlive()) // 抛出异常throw new IllegalThreadStateException();// 将创建的worker添加到线程集合(就是线程池)中workers.add(w);// 获取最新线程池线程的数量int s = workers.size(); // 条件成立,更新largestPoolSize if (s > largestPoolSize)largestPoolSize = s;// 更新标志位,表示线程已经追加到线程池中了workerAdded = true;}} finally {// 释放全局锁mainLock.unlock();}/** 条件成立:workerAdded为true,表示添加worker成功* 条件不成立:表示线程池在lock之前,状态发生了变化,导致添加线程失败*/if (workerAdded) {// 成功后,将线程启动t.start();// 启动标记设置为trueworkerStarted = true;}}} finally {// 判断启动标记if (!workerStarted)// worker启动失败,需要做清理工作addWorkerFailed(w);}// 返回新创建的线程是否启动 true|falsereturn workerStarted;}

addWorkerFaild()

    /** 在addWorker()方法的内部自旋中,我们使用 compareAndIncrementWorkerCount(c) 将线程数+1了,* 但执行到这个方法说明我们创建线程失败了,所以需要进行清理工作,将ctl的值-1*/private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {if (w != null)// 从workers中移除失败的workerworkers.remove(w);// 使用CAS + 自旋将ctl的值-1decrementWorkerCount();// 见后续SHUTDOWN()tryTerminate();} finally {// 释放锁mainLock.unlock();}}

参考

  • 视频参考
    • b站_小刘讲源码付费课
  • 文章参考
    • shstart7_线程池源码解析3.execute()方法

相关内容

热门资讯

小本创业平民项目 小本创业平民... 小本创业:500元起家到月入20万_创业小项目发布日期:2017-03-17来源:创业小项目作者:小...
2011年小本创业流水增涨项目... 现在什么小本生意流水增涨5个“特色小吃”项目推荐!2017年08月03日来源:渠道网现在什么小本生意...
流水增涨的小本创业2011年小... 解答一、美容美发店  美容美发业女人的“美丽产业”大有流水增涨机会。女性买菜时可能会为了一两块钱而计...
关于互联网创业 ”大学生创新创... 人家是如何做呢?先就是分析需求,目前各行各业都有培训,教各种技能的。找一个自己比较刚需的领域,我拿微...
适合白领创业的六大小本创业项万... 阅读本文前,请您先点击上面的蓝色字体“创业之坛”,再点击“关注”,这样您就可以继续免费收到最新文章了...
城镇小本创业项目有哪些?小本项... 现在创业不同于之前,投资也会少很多,不少人在外打工了几年,想要回到自己的小城镇做点小本创业的买卖,那...
现在小本创业有什么好项目 现在... 为什么穷人多不敢去创业蛋糕创业蛋糕店创业30岁现在小本创业有什么好项目女人创业做什么适合女性创业的大...
五万元小本创业好项目 五万元小... 为什么穷人多不敢去创业蛋糕创业蛋糕店创业30岁女人创业做什么适合女性创业的大学生适合什么创业毕业生如...
5万元小本生意做什么好 做点什... 如今市场上有着很多的投资小项目,而导致这些项目出现的原因自然是因为当下人们生活水平以及消费水平的迅速...
小本创业好项目有哪些(月入5万... 1.铁板鱿鱼近几年来走在城市的大街小巷,随处可闻铁板鱿鱼的奇特香味,铁板鱿鱼的制作技术胜在配方上,技...