Kotlin协程 - 先入个门吧

Author Avatar
Dexlind 2月 08, 2018
  • 在其它设备中阅读本文章

你们要的协程文,嗯。封面图id:66548341。

(我终于学会嵌入网易云音乐了,音量注意)

因为是入门嘛,所以本文保证不会出现任何与 kotlinx.coroutines 相关的内容。

本文写于Kotlin1.2的时代,1.3有点变化。所以修订一下本水文,增加一些内容,将原来的过时内容修改为1.3版本的内容,删减一些无关紧要的破事水。

前言

事先说明一点,本文的协程,专指Kotlin语言的协程,仅对Kotlin有效。本文完全不涉及其他编程语言的协程或类似的概念。

话说为什么我要在已经有那么多篇优秀的协程文的情况下再水一篇呢?

首先,因为我太鶸了,bennyhuo的那篇文章根本看不懂,一开始上来就抛出一堆难以理解的专业名词,比如线程、Lua、CoroutineContext 等等,再加上那一堆根本看不懂的 Lua 代码以及 UML 类图,萌新一脸懵逼,直接被劝退,根本不留情面。所以我决定自己写一篇(自己能看懂的)。

其次,在Kotlin1.3版本更新之后,Kotlin官网上的参考文档的协程部分,完全套用了 kotlinx.coroutines 这个官方协程库的教程,对基本语法、标准库内容避而不谈。个人认为这并不合适,了解协程的原理、协程的本质也是十分重要的一环,就酱紫。

本水文不保证其他读者能看懂!(逃

Kotlin 协程有什么用

Kotlin 1.1 的关键新特性是协程,它带来了 future/awaityield 以及类似的编程模式的支持。Kotlin 的设计中的关键特性是协程执行的实现是语言库的一部分,而不是语言的一部分,所以你不必绑定任何特定的编程范式或并发库。

很多人都会纳闷这Kotlin的协程到底有什么用,有什么好处。

在Kotlin官网上的参考文档里讲到「协程实际上是一个轻量级的线程,可以挂起并稍后恢复」,并且使用 kotlinx.coroutines 里的 launch 函数作为例子:

repeat(100_000) { // 启动大量的协程
   launch { ... }
}

如果你找过其他官方资料,还能发现有将 launch 与 Thread.start 对比的例子,大多是在“我轻松开10W个协程,你开1K个线程试试”云云。

然而这只是其中一个优点,并且容易使人感觉「协程只是线程池和线程调度的封装罢了」,未能感受到协程真正魅力所在,十分可惜。

针对上述问题,一个比较好的答案是:

Kotlin协程是 callback 的语法糖,它的主要好处是能让你写不需要 callback 的异步代码。换言之,把异步代码写得看起来就想同步的一样。

直接拿官方的一段代码来做例子,假设有一段回调地狱代码如下:

// 将数据从通道异步读入`buf`, 完成后运行lambda
inChannel.read(buf) {
    // 这个lambda在读取完成时执行
    bytesRead ->
    ...
    ...
    process(buf, bytesRead)

    // 从`buf`异步写入通道, 完成后运行lambda
    outChannel.write(buf) {
        // 这个lambda在写入完成时执行
        ...
        ...
        outFile.close()          
    }
}

再假设上述异步API(即read和write)有支持Kotlin协程的版本(用Kotlin协程对异步API封装为aRead和aWrite),则可以用协程将上面代码改写成如下形式:

launch {
    // 当异步读取进行时挂起协程
    val bytesRead = inChannel.aRead(buf) 
    // 只有在读取完成时才执行至这一行
    ...
    ...
    process(buf, bytesRead)
    // 当异步写入进行时挂起协程 
    outChannel.aWrite(buf)
    // 只有在写入完成时才执行至这一行
    ...
    ...
    outFile.close()
}

看起来就像是没有回调、就像是同步代码一样。然而,协程仅仅是 callback 的语法糖,上面的代码仍然是包含了两个回调的异步代码,但是看起来却无比舒服。

那么,Kotlin协程是如何做到的呢?

基本操作

先说说Kotlin标准库在1.3版本新增了一个类 Result<T>

Result<T> 是一个用于表示 Kotlin 函数执行成功和失败结果的 discriminated union(也叫 tagged union 或者是 algebraic data type),即 Success T | Failure Throwable,很简单。

不懂也没关系。

suspend 关键字

Kotlin在1.1版本新增加了 suspend 关键字,可以用来修饰函数或者 lambda 表达式的类型:

suspend fun suspendFunction(): String { …… }
// ↑ 你看这辣鸡代码高亮 ↓
val suspendLambda: suspend () -> Unit = { …… }
// 上面这个suspend lambda的类型需要显式标明
// 不然其类型会推导成普通的lambda而不是suspend lambda

从Kotlin1.2.30版本开始,可以这样声明 suspend lambda,能够自动推导类型。

// Kotlin 1.2.30 以降
val suspendLambda2 = suspend {
    "Hello world!"
}
// suspendLambda2 的类型将被自动推导为 suspend () -> String
// PS:在这里suspend是一个伪关键字

现在,你得到了 suspend 函数和 suspend lambda。被标记为 suspend 的函数/lambda 只能在 suspend 函数/lambda 中被调用。

在Kotlin1.3版本,新增了 suspend main 功能。

suspend fun main() {
   println("hello coroutine")
}

创建和启动协程

创建并启动一个协程十分简单,你只需要两件宝具:一个 suspend lambda,以及一个 Continuation

import kotlin.coroutines.*

val suspendLambda = suspend {
   "Hello world!"
}
val completion = object : Continuation<String> {
   override val context get() = EmptyCoroutineContext
   override fun resumeWith(result: Result<String>): Unit = println(result.getOrThrow())
}

使用 Kotlin 标准库中的 createCoroutine 函数来创建协程:

val coroutine: Continuation<Unit> = suspendLambda.createCoroutine(completion)

这样你就得到了一个未启动的协程,然后调用 resume 扩展方法启动这个协程:

coroutine.resume(Unit)
// 打印出 Hello world!

或者使用标准库里的 startCoroutine 函数来创建并立即启动一个协程:

suspendLambda.startCoroutine(completion)
// 打印出 Hello world!

很简单。另外对于有带接收者的 suspend lambda,有与之相对应的库函数。

// 创建协程
fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit>
fun <R, T> (suspend R.() -> T).createCoroutine(receiver: R, completion: Continuation<T>): Continuation<Unit>

// 创建并启动协程
fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>): Unit
fun <R, T> (suspend R.() -> T).startCoroutine(receiver: R, completion: Continuation<T>): Unit

// 常用的扩展方法
fun <T> Continuation<T>.resume(value: T) = resumeWith(Result.success(value))
fun <T> Continuation<T>.resumeWithException(exception: Throwable) = resumeWith(Result.failure(exception))

协程挂起和恢复执行

想要暂停一个协程的执行,可以使用标准库里面的 suspendCoroutine 函数:

val suspendLambda = suspend {
    println("before suspend")
    suspendCoroutine<Unit> { }
    println("after suspend")
}
suspendLambda.startCoroutine(object : Continuation<Any> { …… })

// 输出:
// before suspend

如果需要恢复协程,例如等待3秒后继续执行:

val suspendLambda = suspend {
    println("before suspend")
    val int: Int = suspendCoroutine { c ->
        Thread.sleep(3000)
        c.resume(1551)
    }
    println("after suspend, resume with $int")
}
suspendLambda.startCoroutine(object : Continuation<Any> { …… })

// 输出(两行输出间隔3秒):
// before suspend
// after suspend, resume with 1551

suspendCoroutine 函数的签名如下:

inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

如果你知道这上面的各段代码在执行的过程中究竟发生了什么,那太棒了,你不需要浪费时间阅读这篇辣鸡水文,请点击右上角的关闭按钮。

概念摘出:Continuation

Continuation(续延)究竟是一个什么概念?一般来讲,Continuation 表示的是「剩余的计算」的概念,换句话说就是「接下来要执行的代码」。举个例子来说,假设我们有这样一段代码:

println(1551.toString().length)

我们知道这段代码会先执行 1551.toString(),然后再执行 _.length,最后执行 println(_) 将结果打印出来。

1551.toString() 求值之后,需要将其结果传递至 println(_.length)

可以看到,println(_.length) 需要一个 String 类型的值才能执行。这时我们可以将 println(_.length) 改写成一个 lambda 表达式:

{ s: String -> println(s.length) }

这个 lambda 表达式(或者说是闭包)就可以是 1551.toString() 的 Continuation,即「剩余的计算」。

这样,我们就可以通过把 "1551" 传给这个 lambda 来重新构建原始的形式:

{ s: String -> println(s.length) }.invoke("1551")

也就是执行该 lambda 表达式的 invoke 方法以执行「剩余的计算」。

那么 1551.toString().length 的 Continuation 又是什么呢?很简单,是 { i: Int -> println(i) }

以上讲的就是 Continuation 的一般概念。Kotlin 里面的 Continuation 长什么样子?大概像这样:

interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)
}

// 常用的扩展方法
fun <T> Continuation<T>.resume(value: T) =
      resumeWith(Result.success(value))

fun <T> Continuation<T>.resumeWithException(exception: Throwable) =
      resumeWith(Result.failure(exception))

这样我们就可以将 Kotlin 的 Continuation 与上面的 lambda 表达式建立一些对应关系,Continuation的 resumeWith 就相当与 lambda 表达式的 invoke,所以当你拿到一个 Continuation 时,resumeWith 方法即是「剩余的计算」的入口。

PS:「剩余的计算」这个概念,是不是跟 callback 很像呢~

上文基础操作的简要解释

来看上文创建协程的例子:

val suspendLambda = suspend { ... }
val completion = object : Continuation<String> { ... }

val coroutine: Continuation<Unit> = suspendLambda.createCoroutine(completion)

coroutine.resume(Unit) // 执行已创建好的协程

例中 createCoroutine 函数接受一个 Contiuation 类型的参数 completion,表示「协程执行完之后要执行的代码」。而 suspendLambda 则是协程的主体。

createCoroutine 函数负责把这两个东西整合成一个表示「整个协程所有需要执行的代码」的 Contiuation,称为初始 Continuation(initial continuation)。对其调用 resume 则开始运行。

当 suspendLambda 执行完毕后,将其结果传至 completion 的 resume 扩展方法;若 suspendLambda 的执行过程中抛出了异常,则执行 completion 的 resumeWithException 扩展方法。

再看上文 suspendCoroutine 函数的例子:

val suspendLambda = suspend {
   println("before suspend")
   val int: Int = suspendCoroutine { cont ->
      Thread.sleep(3000)
      cont.resume(1551)
   }
   println("after suspend, resume with $int")
}

suspendCoroutine 会将协程挂起(suspend),并且 suspendCoroutine 这个函数接收一个lambda表达式作为参数,这个 lambda 的 cont 参数即是表示「协程挂起后剩下的还没执行的代码」。

针对这个例子,如果要用lambda表达式来表示 cont 这个 Continuation,则是:

{ i: Int ->
   val int = i
   println("after suspend, resume with $int")
}

在线程睡了3000毫秒后,调用 resume 方法将这个协程继续执行下去,suspendCoroutine 的返回值即是通过 resume 方法传入的值(本例中为1551),于是变量int得到值1551,并在之后打印出来。

超简单~!

编译器的魔术

「别逗我了。那种东西怎么会是魔法!」

那么 Kotlin 的协程是怎么实现的呢?

协程完全通过编译技术实现(不需要来自 VM 或 OS 端的支持),挂起通过代码来生效。(本句话抄自 Kotlin 中文网)

CPS 转换

「CPST 就是 Gödel–Gentzen 变换的 Curry–Howard 像而已,这有什么难理解的?」

在编译时,每一个 suspend 函数会被编译器加上一个 Continuation 参数:

// 编译前
suspend fun <T, U> suspendFunction(arg: U): T { …… }

// 编译后
fun <T, U> suspendFunction(arg: U, c: Continuation<in T>): Any? { …… }

这叫做 CPS 转换(Continuation-Passing-Style Transformation)。

因此我们可以认为每个 suspend 函数都有一个 Continuation 类型的隐式参数,每个 suspend 函数都能通过这个参数拿到一个 Continuation,代表着「该函数之后将要执行的代码」。

PS:suspendFunction 经过CPS转换后,返回值的那个 Any? 其实是个类似于 union type(并集类型)的玩意。它其实是 T | COROUTINE_SUSPENDED,意为返回值可能为 T 类型的值,也可能是一个 COROUTINE_SUSPENDED。但是辣鸡 Kotlin 没有 union type,所以只能用 Any? ,使用的时候再做类型强转。(看看人家 ScalaCeylon ,做得多好)

PPS:如果你喜欢翻看源码,你会发现在 Kotlin 标准库的协程部分以及 kotlinx.coroutines 里面能经常见到这种用 Any? 表示的 union types。

状态机

我们知道 Continuation 就相当于一个闭包,经过 CPS 转换,每次调用 suspend 函数都需要传一个 Continuation 进去。为了避免创建过多的闭包和匿名类,Kotlin 选择使用状态机(state machines)来实现 Continuation。

由于懒,我直接把 KEEP 里面的例子抄了过来:

suspend 函数会被编译成一个状态机,例如一个 suspend 函数里有以下代码:

val a = a()
val y = suspendFunction(foo(a)) // 挂起点 1
b()
val z = suspendFunction(bar(a, y)) // 挂起点 2
c(z)

其中的2个 suspend 函数调用点(简称挂起点,suspension point)将这段代码分成3个状态:

状态0:第一个挂起点之前(初始状态)

val a = a()
foo(a)

状态1:第一个挂起点之后,至第二个挂起点之前

val y = _
b()
bar(a, y)

状态2:第二个挂起点之后

val z = _
c(z)

代码会被编译成一个匿名类,它具有一个实现状态机的方法,一个保存状态机当前状态的字段,以及各个状态的局部变量的字段,看起来像这样:

// 伪代码,简化模型,实际情况会比这个要复杂一些
class 状态机匿名类 extends CoroutineImpl implements Continuation<Object> {
    // 这个int用来保存状态机当前的状态
    int label = 0
    // 用来保存suspend方法中的局部变量
    A a = null
    Y y = null
    // 实现状态机的方法
    void resume(Object data) {
        if (label == 0) goto L0
        if (label == 1) goto L1
        if (label == 2) goto L2
        else throw IllegalStateException()
        // 英文不翻译了,懒~
      L0:
        // data is expected to be `null` at this invocation
        a = a()
        label = 1
        data = suspendFunction(foo(a), this) // 'this' is passed as a continuation 
        if (data == COROUTINE_SUSPENDED) return // return if suspendFunction had suspended execution
      L1:
        // external code has resumed this coroutine passing the result of suspendFunction() as data 
        y = (Y) data
        b()
        label = 2
        data = suspendFunction(bar(a, y), this) // 'this' is passed as a continuation
        if (data == COROUTINE_SUSPENDED) return // return if suspendFunction had suspended execution
      L2:
        // external code has resumed this coroutine passing the result of suspendFunction() as data 
        Z z = (Z) data
        c(z)
        label = -1 // No more steps are allowed
        return
    }
}

我们可以看到每次调用 suspendFunction 时,传进去的 Continuation 都是同一个对象,即状态机本身;并且通过 label 来控制状态和代码跳转,使其符合「剩下的计算」的语义。

PS:并不是所有的 suspend 函数都会编译成一个状态机,存在一种尾调用优化(tail call optimization)的机制。举个例子:

suspend fun f1() {
    println("do something before calling f2")
    f2() // <==这里
}
suspend fun f2() { …… }

f1函数内部唯一的一个 suspend 调用是在函数尾部的位置(即 tail suspension invocation),这时不会编译成状态机,而是这样:

fun f1(c: Continuation<in Unit>): Any? {
    println("do something before calling f2")
    f2(c)
}

fun f2(c: Continuation<in Unit>): Any? { …… }

即尾调用优化。(亲爱的读者可以思考一下为什么可以这么做)

另外,如果 suspend 函数里面没有调用任何 suspend 函数,那么也不会被编译成状态机。

具有Kotlin特色的Call/CC

前面提到,编译器会给每一个 suspend 函数添加一个 Continuation 类型的参数,但是我们在代码里是看不到这个参数的,我们要怎么样才能拿到这个参数呢?

于是乎,Kotlin 厚颜无耻地把 Scheme 的 Call/CC(call-with-current-continuation)抄了过来并加以魔改,放在 kotlin.coroutines.intrinsics 这个包里,就是这玩意:

inline suspend fun <T> suspendCoroutineUninterceptedOrReturn(
   crossinline block: (Continuation<T>) -> Any?
): T
// 不要问我为什么这玩意的名字这么长

这个函数是 Kotlin 协程中最为重要的函数,是一个固有函数(intrinsic function,即编译器特殊对待的函数),其实现无法用 Kotlin 代码表示,需要编译器在编译的时候进行替换。如果你去看了它的源码,你会看到类似与这样的东西:

inline suspend fun <T> suspendCoroutineUninterceptedOrReturn(
   crossinline block: (Continuation<T>) -> Any?
): T = throw NotImplementedError("Implementation is intrinsic")
// 等号后面也有可能是一个 `null!!`

在经过 CPS 转换后,我们来看一下这个函数的真面目:

inline fun <T> suspendCoroutineUninterceptedOrReturn(
   crossinline block: (Continuation<T>) -> Any?, c: Continuation<T>
): Any? = block(c)

简单明了,直接将这个新增的 Continuation 参数传给了 lambda,这样我们就可以通过这个 lambda 来操纵由 CPS 转换得来的 Continuation。

上文讲过,这里所有的 Any? 其实都是 T | COROUTINE_SUSPENDEDCOROUTINE_SUSPENDED 的定义也在这个包里:

val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

@PublishedApi
internal enum class CoroutineSingletons {
   COROUTINE_SUSPENDED, UNDECIDED, RESUMED 
}

从上文的那个状态机伪代码里面可以看到,对于每个 suspend 函数的调用,都会检查其返回值是不是 COROUTINE_SUSPENDED。如果不是,那么状态机就开始执行下一个状态的代码;如果是 COROUTINE_SUSPENDED,就直接返回,停止执行代码,即协程挂起。可以写个demo来验证一下:

suspend fun f3(): Unit = suspendCoroutineUninterceptedOrReturn { c ->
    COROUTINE_SUSPENDED
}

val suspendLambda: suspend () -> Unit = {
    println("before suspend")
    f3()
    println("after suspend")
}
suspendLambda.startCoroutine(object : Continuation<Any> { …… })

结果只有 before suspend 被打印了出来。如果要继续执行下去,则需要通过 Continuation,调用其 resume 扩展方法:

suspend fun f3(): Unit = suspendCoroutineUninterceptedOrReturn { c ->
    thread {
        Thread.sleep(5_000)
        c.resume(Unit)
    }
    COROUTINE_SUSPENDED
}

这时候我们可以看到,before suspend 先被打印出来,5秒种后,after suspend 再被打印出来。

f3函数经过 CPS 转换、suspend 函数的尾调用优化以及 suspendCoroutineUninterceptedOrReturn 的内联,最终会变成如下的样子:

fun f3(c: Continuation<Unit>): Any? {
    thread {                //  ↑ 这个Any?其实是 ↓
        Thread.sleep(5_000) //       Unit | COROUTINE_SUSPENDED
        c.resume(Unit)
    }
    return COROUTINE_SUSPENDED
}

看起来就像是在直接操纵编译时添加的 Continuation 参数。

本节比较长,给个小结:suspendCoroutineUninterceptedOrReturn 能够让你直接操纵通过CPS转换得来的 Continuation 参数。

suspendCoroutineUninterceptedOrReturn { cont -> ... }

这个函数接收一个 lambda 参数,在这个 lambda 里面,你将面临两种选择:直接返回需要的结果(不挂起协程)或者返回 COROUTINE_SUSPENDED(挂起协程)。如果你选择挂起协程,你需要在合适的地方与时机调用(从lambda的参数得到的)Continuation 的 resume 扩展方法将需要的结果传入以便继续执行协程。

Kotlin 标准库里面的 suspendCoroutine 即是对 suspendCoroutineUninterceptedOrReturn 的封装,使其更易于使用。读者可以对比一下两者的函数签名以及文档上的注意事项。一般情况下使用 suspendCoroutine 即可满足需求。

PS:不知道从什么时候开始的,kotlin.coroutines.intrinsics 这个包以及里面的所有东西,都不会出现在 Intellij IDEA 的自动补全的列表里了。你需要手动 import 这个包,才能享受到自动补全的便利。

PPS:本文完。

你学到了什么

  • Kotlin协程的一些(不常用的)标准库函数的使用方法
  • 编译器都做了什么事(的一部分)
  • suspendCoroutineUninterceptedOrReturn 有什么用

下一篇协程文?不存在的。

课后作业:我在 Codewars 上面出了一道题,要求实现简单的控制流,很简单。如果你能看懂这篇文章,那么这道题对于你来说应该是十分简单的。(千里冰封julao仅用了不到十分钟的时间就做出来了)

反编译Tip:Intellij IDEA的Kotlin插件有将Kotlin代码的字节码反编译至Java的功能,但是在面对协程相关的代码时大多数情况下都不好用。请不要想太多,老老实实用其他反编译工具,我用的是这个jadx,版本0.6.1。

知识共享许可协议
本作品采用知识共享 署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可,转载请注明出处。

本文链接:https://aisia.moe/2018/02/08/kotlin-coroutine-kepa/