kotlin coroutine源码解析之suspend挂起函数原理
创始人
2024-01-28 10:41:50
0

目录

  • suspend挂起函数
    • join原理
    • Await原理
    • Suspend函数
    • 总结

suspend挂起函数

在idea中写某些协程函数的时候,会有一个绿色箭头图标的出现,如下图:
在这里插入图片描述
而且这些方法不放在协程里面写的话,idea编辑器还会报错,如下图:
在这里插入图片描述
上面所说的这些方法就是挂起函数,挂起函数必须要在协程中调用,或者在挂起函数中调用;放在挂起函数中调用挂起函数调用,那么说明还是间接在协程中被调用,也就是挂起函数调用需要协程环境。
在说挂起函数原理之前,先复习下 launch启动过程 所说的三类continuation:

  1. DispatchedContinuation用于分发continuation到指定的线程池中;
  2. ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类;
  3. StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。

join原理

启动一个launch的时候,返回一个Job对象,这个Job对象对应着上面三个continuation实例,其中我们的业务代码在ContinuationImpl中,例如下面的代码:

  val myScope = CoroutineScope(CoroutineName("name") + IO + Job())val job = myScope.launch(CoroutineName("job")) {var job2 : Job? = launch(CoroutineName("job2")) {Thread.sleep(20000)}job2?.join()printlnM("job has run to end")}printlnM("job has launch")

这段代码编译后的代码如下:

BuildersKt.launch$default(myScope,(CoroutineContext)(new CoroutineName("job")),(CoroutineStart)null,(Function2)(new Function2((Continuation)null) {// $FF: synthetic fieldprivate Object L$0;int label;public final Object invokeSuspend(@NotNull Object $result) {Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch(this.label) {case 0:ResultKt.throwOnFailure($result);CoroutineScope $this$launch = (CoroutineScope)this.L$0;//创建job2Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)this.label = 1;//调用join方法if (job2.join(this) == var5) {return var5;}break;case 1:ResultKt.throwOnFailure($result);break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}MainActivity.Companion.printlnM("job has run to end");return Unit.INSTANCE;}//省略。。。}), 2, (Object)null);

Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)job2的省略代码后面贴出,不影响代码流程。

可以看到,我们的业务代码被编译到invokeSuspend()函数中了,是通过switch case语句来运行对应的代码逻辑,
第一步:case 0执行的是Job job2 = BuildersKt.launch$default(/*省略。。。*/) 也就是逻辑代码里面的创建job2那句代码,然后this.label = 1; ,label设置为1,接着调用job2.join()方法,join方法一般都是返回COROUTINE_SUSPENDED的,那么在这里就返回结束了,除了立马就有返回值的挂起函数返回值,这个if判断就不成立了,直接走下面第三步;
第二步:label为1了,那么case 1的情况成立,入参设置一下异常,这个入参是上一步传递进来的,没有异常的话这里是空的,break;
第三步:调用printlnM("job has run to end");打印语句。

我们可以看到,遇到有函数返回COROUTINE_SUSPENDED的,那么invokeSuspend函数会立刻return结束掉。按照之前 launch启动过程 分析的,launch的lambda逻辑代码被调用的调用链是:

DispatchedContinuation -> ContinuationImpl(内部调用了invokeSuspend) -> StandAloneCoroutine

如果invokeSuspend函数因为返回了COROUTINE_SUSPENDED直接结束掉的话,岂不是这个Job2协程的代码还没有跑完,就结束了?显然是不行的,仔细观察发现,只需要多次调用invokeSuspend方法,label会随着每次调用都会递增或者变动,那么对应的case一定会让所有的代码都执行一遍的。
可以把这种switch case语句当成是人们常说的状态机模式,在安卓开发中,常用的handler.sendMessage就很类似状态机模式,根据不同的what去处理不同的逻辑,只是协程会将顺序代码,根据case分成一个接着一个连续的代码段。

这里我们直接去跟踪join源码看是怎么实现的:

job2.join(this)public final override suspend fun join() {if (!joinInternal()) { // fast-path no waitcoroutineContext.checkCompletion()return // do not suspend}return joinSuspend() // slow-path wait
}private suspend fun joinSuspend() = suspendCancellableCoroutine { cont ->// We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlerscont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}

主要看joinSuspend函数,里面有几个重要的点:

  1. suspendCancellableCoroutine { cont -> 让当前协程在调用该函数处挂起,给当前协程的lambda代码块提供一个CancellableContinuation。
    suspendCoroutineUninterceptedOrReturn { uCont ->val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)/** For non-atomic cancellation we setup parent-child relationship immediately* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but* properly supports cancellation.*/cancellable.initCancellability()block(cancellable)cancellable.getResult()}

首先创建一个CancellableContinuationImpl类型的continuation,入参是uCont.intercepted(),这个uCont是调用的协程,那么就是val job这个协程,uCont.intercepted()也就是 launch启动过程 那章分析的DispatchedContinuation对象,由于val job = myScope.launch(CoroutineName("job")) 这句话已经将DispatchedContinuation生成了,所有会复用之前的对象,代码如下:

    public fun intercepted(): Continuation =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }

那么cancellable对象持有的就是val job的DispatchedContinuation,

接着在调用 cancellable.initCancellability()

private fun setupCancellation() {//省略val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parentparent.start() // make sure the parent is startedval handle = parent.invokeOnCompletion(onCancelling = true,handler = ChildContinuation(parent, this).asHandler)parentHandle = handle//省略
}

看代码,val handle = parent.invokeOnCompletion 之前分析过,是parent和当前job组成父子关系的作用,parent代表val job,当前job就是cancellable对象,意思就是父子Job可以相互取消对方,和之前父子Job关联的分析是差不多的,那么val jobcancellable组成了父子关系。

在调用 block(cancellable),这个就进入下面2了。

  1. invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler),这个方法是将入参ResumeOnCompletion和当前的JobSupport关联起来,加入到当前Jobsupport.state.list中,asHandler返回一个DisposableHandler然后将这个handler加入到cont的state.list中去,cont也是上面1中变量cancellable,是CancellableContinuationImpl类型的继承自continuation,这样val Job2和新创建的cancellable也组成了父子关系了。

  2. ResumeOnCompletion类型,传入的this,和cont参数,用来恢复cont协程的作用:

private class ResumeOnCompletion(job: Job,private val continuation: Continuation
) : JobNode(job)  {override fun invoke(cause: Throwable?) = continuation.resume(Unit)override fun toString() = "ResumeOnCompletion[$continuation]"
}

调用了invoke -> continuation.resume(Unit),这样continuation就会继续执行了。

经过上面的分析之后,可以看出整个Job树的结构如下:
在这里插入图片描述
看图:其中cancellable是可以被job取消的,取消之后,会将cancellable移除掉,cancellable被取消后,会遍历自己的state.list列表调用invoke方法,那么就会调用DIsposableHandle的invoke方法,将自己从job2中移除掉,cancellable就失去了作用。

结合一下job2的被编译后的代码,Job2编译后的代码如下:

BuildersKt.launch$default($this$launch, (CoroutineContext)(new CoroutineName("job2")),(CoroutineStart)null, (Function2)(new Function2((Continuation)null) {int label;@Nullablepublic final Object invokeSuspend(@NotNull Object var1) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch(this.label) {case 0:ResultKt.throwOnFailure(var1);Thread.sleep(1000L);return Unit.INSTANCE;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}}//省略。。。}), 2, (Object)null);

invokeSuspend方法被调用后,休眠了1s中,之后结束了,就会继续调用completion的resume方法,这样就会调用到job2对应的

AbstractCoroutine.resumeWith -> makeCompletingOnce -> 
notifyHandlers(list: NodeList, cause: Throwable?) -> 
(ResumeOnCompletion) node.invoke(cause) -> continuation.resume(Unit) ->

进入到CancellableContinuationImpl类中,

CancellableContinuationImpl.resumeWith -> resumeImpl -> 
dispatchResume -> dispatch -> dispatcher.dispatch(context, this)

上面已经分析了dispatcher.dispatch(context, this) dispatcher这个对象是根val job的dispatcher,在val joblaucnh的时候已经创建好了的,此时正在挂起中,所以这里调用dispatch就有下面的调用链:

BaseContinuationImpl..resumeWith() ->   val outcome = invokeSuspend(param)  ->
completion.resumeWith(outcome)

job之前在挂起点结束了,现在又再一次被CancellableContinuationImpl类型的continuation调用,那么job就从这个挂起点被唤醒了。这样就从case 0,运行到case 1了,然后job就结束了。

Await原理

将上面代码的Join换成await方法,代码如下:

  val myScope = CoroutineScope(CoroutineName("name") + IO + Job())val job = myScope.launch(CoroutineName("job")) {var deferred = async (CoroutineName("job2")) {Thread.sleep(20000)}job2?.await()printlnM("job has run to end")}printlnM("job has launch")

原理其实和Join方法差不多,流程是一样的,只是其中的某些节点的类型不一样,代码如下:

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->val cont = AwaitContinuation(uCont.intercepted(), this)cont.initCancellability()cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))cont.getResult()
}
  1. 创建一个AwaitContinuation类型的continuation,
  2. 然后调用initCancellability方法,这个在上一节讲过,是cont节点和自己的父节点产生父子关系关联,这个父节点也就是uCont,intercepted() 对象,也就是val job 的协程,
  3. 接着
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))

这句话在join方法里面看到了,是job2和AwaitContinuation产生父子关系的功能。
那么我们看下ResumeAwaitOnCompletion类型的节点是怎么实现的吧,

private class ResumeAwaitOnCompletion(private val continuation: CancellableContinuationImpl
) : JobNode() {override fun invoke(cause: Throwable?) {val state = job.stateif (state is CompletedExceptionally) {continuation.resumeWithException(state.cause)} else {continuation.resume(state.unboxState() as T)}}
}

和ResumeOnCompletion节点其实差不多,只不过调用resume的时候参数值是await的返回值,所以在挂起点恢复的时候,还带有挂起函数执行完成的返回值;
而且在出现异常的时候,还可以将异常抛出去给父Job处理,这一点好像比Job功能更完善。
可以画个图描述一下await的Job树:
在这里插入图片描述
job2是通过AwaitContinuation完成挂起点恢复的,这个类也是继承子 CancellableContinuationImpl的,只是覆盖fun getContinuationCancellationCause(parent: Job): Throwable用于获取异常信息的接口。
其他流程完全和join方法一模一样、

Suspend函数

通过上面的Join和Await的分析,感觉挂起协程之后,似乎需要恢复协程才可以让协程执行完成未完成的代码的。如果Join和Await方法不创建CancellableContinuationImpl这个continuation节点的话,其实val job协程挂起后就结束了,剩下的代码是不会完成,后面的打印语句是不会执行的。这个直觉是正确的,Suspend函数就是有这个作用的,挂起协程之后,需要逻辑代码在挂起函数内部,主动去调用resume和resumeWithException()方法,用于恢复协程。

suspend函数挂起代码实例:

fun SuspendFunc() {val myScope = CoroutineScope(Dispatchers.IO)myScope.launch {printlnM("suspend func run before")testRemoteRequest()printlnM("suspend func run after")}
}suspend fun testRemoteRequest():String {return suspendCancellableCoroutine { cancellableContinuation ->getUserInfo(object : Callback {override fun success() {printlnM("成功请求用户信息")cancellableContinuation.resume("成功")}override fun failed() {printlnM("请求用户信息失败")cancellableContinuation.resumeWithException(Exception("失败"))}})}
}interface Callback{fun success()fun failed()
}fun getUserInfo(callback: Callback?) {try {printlnM("getUserInfo")Thread.sleep(3000)callback?.success()} catch (e:Exception) {callback?.failed()e.printStackTrace()}
}

调用挂起函数,创建一个挂起点suspendCancellableCoroutine ,我们的逻辑代码写在suspendCancellableCoroutine 的lambda代码块中,内部调用模拟的网络请求,三秒后,主动调用resume方法,让挂起协程恢复执行。

打印结果如下:

2022-11-17 02:59:44.336  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
2022-11-17 02:59:44.337  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
2022-11-17 02:59:47.338  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
2022-11-17 02:59:47.339  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run after

可以看到协程运行到getUserInfo,中间过了三秒后才继续执行,说明Job被挂起后恢复执行了。

现在修改代码,让网络请求失败,看看异常是谁来处理的:

val myScope = CoroutineScope(Dispatchers.IO + CoroutineExceptionHandler { c,e ->printlnM("scopeExceptionHandler : " + e.message)
})getUserInfo(object : Callback {override fun success() {printlnM("成功请求用户信息")cancellableContinuation.resumeWithException(Exception("失败"))}override fun failed() {printlnM("请求用户信息失败")cancellableContinuation.resumeWithException(Exception("失败"))}
})

打印日志如下:

2022-11-17 03:10:07.952  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
2022-11-17 03:10:07.953  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
2022-11-17 03:10:10.954  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
2022-11-17 03:10:10.955  E/MainActivity: DefaultDispatcher-worker-1 : scopeExceptionHandler : 失败

可以看到,协程挂起后,三秒后,网路请求失败,挂起点抛出的异常,被根部的CoroutineExceptionHandler处理了,也就是说挂起函数的异常,也是遵循Job异常处理所说的链路传播。

根据上面的分析,可以给挂起流程画个时序图,用于展示线程是怎么切换的,怎么恢复协程的,如下图所示:
在这里插入图片描述

其中带颜色的方框,代表的是DisptacherContinuation,它持有的线程池执行它持有的的协程,所以可以知道挂起点恢复之后,协程所执行的线程池是保持一致的,对于不同的协程之间,由它继承的Disptahcer决定,或者通过launch传递disptahcer参数覆盖,这样就可以修改协程运行所在的线程了。

总结

1. 父JobA的lambda表达式中有挂起函数,协程会在父JobA的挂起点处创建一个CancellableContinuationImpl类型的continuation,这个Cancellable会和父JobA进行父子关联;
如果挂起函数本身是某个JobB的挂起函数,那么Cancellable还会和JobB组成父子关系,JobB在结束自己的时候,会通知Cancellable自己完成了,Cancellable又会继续通知JobA继续执行lambda的代码块,这样JobA就从挂起点恢复过来了。
如果挂起函数是由suspendCancellableCoroutine函数完成的,那么需要在lambda代码块中,收到调用resume函数去主动唤起协程。

2. suspendCancellableCoroutine是挂起函数的重点,这个函数才是挂起函数的实现成为可能。

3. 由于JobA恢复执行的dispatcher不变,所以JobA的lambda代码在挂起点前后所执行的线程池是一样的,如果是单线程的话,那么线程前后是一样的。而挂起点continuation本身的逻辑代码执行线程由挂起点自己决定。

相关内容

热门资讯

5万元小本生意做什么好 做点什... 如今市场上有着很多的投资小项目,而导致这些项目出现的原因自然是因为当下人们生活水平以及消费水平的迅速...
小本创业好项目有哪些(月入5万... 1.铁板鱿鱼近几年来走在城市的大街小巷,随处可闻铁板鱿鱼的奇特香味,铁板鱿鱼的制作技术胜在配方上,技...
加盟小本创业好项目有哪些? 加... 创业对于很多人来说,是一件需要勇气去实现的事,由于其不只有资金方面的风险,对投资者的精力方面也有着一...
2020小本创业做什么好必看 ... 创业并不是一件简单的事情,稍有偏差,就有可能迎来失败的结局,因此有一个好的创业项目至关重要,那么20...
小本创业点子项目 小本创业点子... 一个好的创业点子往往能够在创业中起到意想不到的作用,那么有哪些创业点子适合致富呢?下面由小编与大家分...
2016小本创业点子推荐 20... 新的一年马上就要过去了,很多想创业的朋友准备大干一场。关于创业,先要对当下的发展有清楚的认识,了解发...
小本创业点子推荐 热门生意小成... 1开家少儿书店现在很多图书都可以在网上阅览,但是孩子看纸质书好一点,可以开办一家专门服务于少儿的书店...
千元投资创业项目 千元创业 1... 社会永远是最穷的人和最富有的人,那么穷人应该永远是穷人吗?不,有些人选择学习以寻找出路,有些人选择经...
创业金点子大全 创业金点子大全...   最能够赚钱的创业好点子:美甲店手有女人“第二张脸”之称,如今越来越多的女性开始关心起指甲美容。随...
恭喜! 南京胖哥第二个孩子出生   恭喜!南京胖哥第二个孩子出生,儿子7斤8两,一出生嗷嗷哭嗓门还大,“他跟姐姐真的长得好像好像”(...