Kotlin协程 - 先入个门吧

Author Avatar
Dexlind 2月 08, 2018
  • 在其它设备中阅读本文章

你们要的协程文,嗯。封面图id:66548341。

因为是入门嘛,所以本文保证不会出现任何与 kotlinx.coroutines 相关的内容。

前言

话说为什么我要在已经有辣么多篇优秀的协程文的情况下再水一篇呢?因为我太鶸了,bennyhuo的那篇文章根本看不懂,一开始上来就抛出一堆难以理解的专业名词,比如线程、Lua、CoroutineContext 等等,再加上那一堆根本看不懂的 Lua 代码以及 UML 类图,萌新一脸懵逼,直接被劝退,根本不留情面。所以我决定自己写一篇(自己能看懂的),就酱。

本水文不保证其他读者能看懂!(逃

基本操作

Kotlin 1.1 的关键新特性是协程,它带来了 future/awaityield 以及类似的编程模式的支持。Kotlin 的设计中的关键特性是协程执行的实现是语言库的一部分,而不是语言的一部分,所以你不必绑定任何特定的编程范式或并发库。

为了不让内容太过单薄,抄了一段Kotlin中文网上的翻译(逃

suspend 关键字

Kotlin在1.1版本新增加了 suspend 关键字,可以用来修饰函数或者 lambda 表达式的类型:

suspend fun suspendFunction(): String { …… }
// ↑ 你看这辣鸡代码高亮 ↓
val suspendLambda: suspend () -> Unit = { …… }
// 你需要显式写出这个suspend lambda的类型,不然其类型会推导成普通的lambda而不是suspend lambda

然后你得到了一个 suspend 函数和 suspend lambda。被标记为 suspend 的函数只能在 suspend 函数或 suspend lambda 中被调用。

Kotlin 1.2.30 版本提供了一个更简单的方法来声明无参数的 suspend lambda,不用显式写出类型了:

// Kotlin 1.2.30 以后才能这样写
val suspendLambda = suspend {
    "Hello world!"
}
// suspendLambda 的类型将被自动推导为 suspend () -> String

PS:Kotlin 将在下次大更新(可能是 1.3)时让 suspend 关键字能够修饰 lambda 表达式,使之称为一个 suspend lambda,就像上面代码里的那样。但是 Kotlin 团队的那帮人根本就没有耐心,决定在 Kotlin 1.2.30 版本(小更新)就引入这个特性,但是又不想引入 breaking change,于是折中了一下,往标准库里加入了一个辅助函数,等到大版本更新正式加入这个功能时再移除这个函数:

@kotlin.internal.InlineOnly
public inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block

因为计划中这是要成为一个修饰符的,所以编译器会禁止那些看起来不像是修饰符的用法(non-modifier-like usages):

suspend { return@suspend "1551" } // 不允许 return@suspend,编译错误!
suspend poi@ { return@poi "1551" } // 这是好的!

::suspend // 禁止函数引用,编译错误!

因为是个折衷的版本嘛,所以目前只适用于声明无参 suspend lambda,等下个大版本真正成为一个 lambda 的修饰符的时候,就能适用于有参数的 lambda 了。

创建和启动协程

创建并启动一个协程十分简单,你只需要两件宝具:一个 suspend lambda,以及一个 Continuation

import kotlin.coroutines.experimental.*

val suspendLambda: suspend () -> String = {
    "Hello world!"
}
val completion = object : Continuation<String> {
    override val context get() = EmptyCoroutineContext
    override fun resume(value: String) {
        println(value)
    }
    override fun resumeWithException(exception: Throwable): Unit = throw exception
}

使用 Kotlin 标准库中的 createCoroutine 函数来创建协程:

val coroutine: Continuation<Unit> = suspendLambda.createCoroutine(completion)

然后调用 resume 方法启动这个协程:

coroutine.resume(Unit) // 打印出 Hello world!

或者使用标准库里的 startCoroutine 函数来创建并立即启动一个协程:

suspendLambda.startCoroutine(completion) // 打印出 Hello world!

很简单。另外对于有带接收者的 suspend lambda,有与之相对应的库函数。

fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit>
fun <R, T> (suspend R.() -> T).createCoroutine(receiver: R, completion: Continuation<T>): Continuation<Unit>

fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>): Unit
fun <R, T> (suspend R.() -> T).startCoroutine(receiver: R, completion: Continuation<T>): Unit

协程挂起和恢复执行

想要暂停一个协程的执行,可以使用标准库里面的 suspendCoroutine 函数:

val suspendLambda: suspend () -> Unit = {
    println("before suspend")
    suspendCoroutine<Unit> { }
    println("after suspend")
}

suspendLambda.startCoroutine(object : Continuation<Any> { …… })

// 只输出 before suspend

如果需要恢复协程,例如等待3秒后继续执行:

val suspendLambda: suspend () -> Unit = {
    println("before suspend")
    val int: Int = suspendCoroutine { c ->
        Thread.sleep(3000)
        c.resume(1551)
    }
    println("after suspend, resume with $int")
}
suspendLambda.startCoroutine(object : Continuation<Any> { …… })

// 输出(两行输出间隔3秒):
// before suspend
// after suspend, resume with 1551

suspendCoroutine 函数的签名如下:

inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

如果你知道这上面的各段代码里面究竟发生了什么,那太棒了,你不需要浪费时间阅读这篇辣鸡水文,请点击右上角的关闭按钮。

概念摘出:Continuation

Continuation(续延)究竟是一个什么概念?一般来讲,Continuation 表示的是「剩余的计算」的概念,换句话说就是「接下来要执行的代码」。举个例子来说,假设我们有这样一段代码:

println(1551.toString().length)

我们知道这段代码会先执行 1551.toString(),然后再执行 _.length,最后将结果打印出来 println(_)

1551.toString() 求值之后,需要将其结果传递至 println(_.length)。我们可以将 println(_.length) 写成一个 lambda 表达式:{ s: String -> println(s.length) }。这个 lambda 表达式(或者说是闭包)表示的就是 1551.toString() 的 Continuation,即「剩余的计算」。

这样,我们就可以通过把 1551.toString() 应用到这个lambda来重新构建原来的形式:{ s: String -> println(s.length) }.invoke(1551.toString())。换句话说,执行该 lambda 表达式的 invoke 方法以执行「剩余的计算」。

那么 1551.toString().length 的 Continuation 又是什么呢?很简单,是 { i: Int -> println(i) }

以上讲的就是 Continuation 的一般概念。Kotlin 里面的 Continuation 长什么样子?大概像这样:

interface Continuation<in T> {
   val context: CoroutineContext
   fun resume(value: T)
   fun resumeWithException(exception: Throwable)
}

这不就跟上面的 lambd 表达式很像嘛,Continuation接口的 resume 就相当与 lambda 表达式的 invoke ,所以当你拿到一个 Continuation 时,resume 方法即是「剩余的计算」的入口。

上文基本操作的例子中,createCoroutine 函数接受一个 Contiuation 参数 completion,表示「协程执行完之后要执行的代码」。而 createCoroutine 的接收者 suspendLambda 则是协程的主要部分。createCoroutine 函数把这两个东西搓成一个表示「整个协程所有需要执行的代码」的 Contiuation,称为初始 Continuation(initial continuation)。当 suspendLambda 执行完毕后,将其结果传至 completion 的 resume 方法;若 suspendLambda 的执行过程中抛出了异常,则走 completion 的 resumeWithException 方法。

再看上文 suspendCoroutine 函数的例子:

val suspendLambda: suspend () -> Unit = {
    println("before suspend")
    val int: Int = suspendCoroutine { c ->
        Thread.sleep(3000)
        c.resume(1551)
    }
    println("after suspend, resume with $int")
}

suspendCoroutine 函数接收一个lambda表达式作为参数,这个 lambda 的 Continuation 参数即是表示「协程挂起后剩下的还没执行的代码」。在这个例子中,表示的是 { i: Int -> val int = i; println("after suspend, resume with $int") }。调用resume方法将这个协程继续执行下去,suspendCoroutine 的返回值即是通过 resume 方法传入的值(本例中为1551),于是变量int得到值1551

超简单~!

编译器的魔术

「别逗我了。那种东西怎么会是魔法!」

那么 Kotlin 的协程是怎么实现的呢?

协程完全通过编译技术实现(不需要来自 VM 或 OS 端的支持),挂起通过代码来生效。(本句话抄自 Kotlin 中文网)

CPS 转换

「CPST 就是 Gödel–Gentzen 变换的 Curry–Howard 像而已,这有什么难理解的?」

在编译时,suspend 函数会被编译器加上一个 Continuation 参数:

// 编译前
suspend fun <T, U> suspendFunction(arg: U): T { …… }

// 编译后
fun <T, U> suspendFunction(arg: U, c: Continuation<in String>): Any? { …… }

这叫做 CPS 转换(Continuation-Passing-Style transformation)。

可以认为每个 suspend 函数都有一个隐式参数,每个 suspend 函数都能通过这个参数拿到一个 Continuation,代表着「该函数之后将要执行的代码」。

PS:suspendFunction 经过CPS转换后,返回值的那个 Any? 其实是个类似于 union types(并集类型)的玩意。它其实是 T | COROUTINE_SUSPENDED,表示返回值可能为 T 类型的值,也可能是个 COROUTINE_SUSPENDED。但是辣鸡 Kotlin 没有 union types,所以只能写成 Any?,使用的时候再做类型强转。(看看人家 ScalaCeylon ,做得多好)

PPS:如果你喜欢翻看源码,你会发现在 Kotlin 标准库的协程部分以及 kotlinx.coroutines 里面能经常见到这种用 Any? 表示的 union types。

状态机

我们知道 Continuation 就相当于一个闭包,经过 CPS 转换,每次调用 suspend 函数都需要传一个 Continuation 进去。为了避免创建过多的闭包和匿名类,Kotlin 选择使用状态机(state machines)来实现 Continuation。

由于懒,我直接把官方的非正式文档里面的例子抄了过来:

suspend 函数会被编译成一个状态机,例如一个 suspend 函数里有以下代码:

val a = a()
val y = suspendFunction(foo(a)) // 挂起点 1
b()
val z = suspendFunction(bar(a, y)) // 挂起点 2
c(z)

其中的2个 suspend 函数调用点(简称挂起点,suspension point)将这段代码分成3个状态:

状态0:第一个挂起点之前(初始状态)

val a = a()
foo(a)

状态1:第一个挂起点之后,至第二个挂起点之前

val y = _
b()
bar(a, y)

状态2:第二个挂起点之后

val z = _
c(z)

代码会被编译成一个匿名类,它具有一个实现状态机的方法,一个保存状态机当前状态的字段,以及各个状态的局部变量的字段,看起来像这样:

// 伪代码,简化模型,实际情况会比这个要复杂一些
class 状态机匿名类 extends CoroutineImpl implements Continuation<Object> {
    // 这个int用来保存状态机当前的状态
    int label = 0
    // 用来保存suspend方法中的局部变量
    A a = null
    Y y = null
    // 实现状态机的方法
    void resume(Object data) {
        if (label == 0) goto L0
        if (label == 1) goto L1
        if (label == 2) goto L2
        else throw IllegalStateException()
        // 英文不翻译了,懒~
      L0:
        // data is expected to be `null` at this invocation
        a = a()
        label = 1
        data = suspendFunction(foo(a), this) // 'this' is passed as a continuation 
        if (data == COROUTINE_SUSPENDED) return // return if suspendFunction had suspended execution
      L1:
        // external code has resumed this coroutine passing the result of suspendFunction() as data 
        y = (Y) data
        b()
        label = 2
        data = suspendFunction(bar(a, y), this) // 'this' is passed as a continuation
        if (data == COROUTINE_SUSPENDED) return // return if suspendFunction had suspended execution
      L2:
        // external code has resumed this coroutine passing the result of suspendFunction() as data 
        Z z = (Z) data
        c(z)
        label = -1 // No more steps are allowed
        return
    }
}

各位读者自行体会,我懒得解释了。

我们可以看到每次调用 suspendFunction 时,传进去的 Continuation 都是同一个对象,即状态机本身;并且通过 label 来控制状态和代码跳转,使其符合「剩下的计算」的语义。

PS:并不是所有的 suspend 函数都会编译成一个状态机,存在一种尾调用优化(tail call optimization)的机制。举个例子:

suspend fun f1() {
    println("do something before calling f2")
    f2() // <==这里
}
suspend fun f2() { …… }

f1函数内部唯一的一个 suspend 调用是在函数尾部的位置(即 tail suspension invocation),这时不会编译成状态机,而是这样:

fun f1(c: Continuation<in Unit>): Any? {
    println("do something before calling f2")
    f2(c)
}

fun f2(c: Continuation<in Unit>): Any? { …… }

即尾调用优化。(亲爱的读者可以思考一下为什么可以这么做)

PPS:在 Kotlin 1.2.30 之前的版本中(不包括 1.2.30),上面的代码并不会发生尾调用优化,你需要这样:

suspend fun f1() {
    println("do something before calling f2")
    return f2() // 这里必须写 return
}
suspend fun f2() { …… }

显式地写成 return f2() 才能有尾调用优化。

伪・Call/CC

前面提到,每个 suspend 函数都有一个隐式的 Continuation 参数(由编译器在编译时添加),但是我们在代码里是看不到这个参数的,我们要怎么样才能拿到这个参数呢?

Kotlin 厚颜无耻地把 Scheme 的 Call/CC(call-with-current-continuation)抄了过来并加以魔改,放在 kotlin.coroutines.experimental.intrinsics 这个包里,就是这玩意:

inline suspend fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T

这个函数是 Kotlin 协程库中最重要的函数。在 Kotlin1.1 版本里,这个函数是个固有函数(intrinsic function,即编译器特殊对待的函数),如果你去看了它的源码(Kotlin1.1 版本),你会看到类似与这样的东西:

inline suspend fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
        throw NotImplementedError("Implementation is intrinsic") // 或者可能是一个 `null!!`

流石固有函数。在经过 CPS 转换后,我们来看一下这个函数的真面目:

inline fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?,
                                        c: Continuation<T>): Any? = block(c)

简单明了,直接将这个 Continuation 参数传给了 lambda,我们就可以通过这个 lambda 参数来操纵由 CPS 转换得来的 Continuation 。

上文讲过,这个返回值 Any? 其实是 T | COROUTINE_SUSPENDEDCOROUTINE_SUSPENDED 的定义也在这个包里:

val COROUTINE_SUSPENDED: Any = Any()

太简单了。从上文的那个状态机伪代码里面可以看到,对于每个 suspend 函数的调用,都会检查其返回值是不是 COROUTINE_SUSPENDED。如果不是,那么状态机就开始执行下一个状态的代码;如果是 COROUTINE_SUSPENDED,就直接返回,停止执行代码,即协程挂起。可以写个demo来验证一下:

suspend fun f3(): Unit = suspendCoroutineOrReturn { c ->
    COROUTINE_SUSPENDED
}

val suspendLambda: suspend () -> Unit = {
    println("before suspend")
    f3()
    println("after suspend")
}
suspendLambda.startCoroutine(object : Continuation<Any> { …… })

结果只有 before suspend 被打印了出来。如果要继续执行下去,则需要调用其 resume 方法:

suspend fun f3(): Unit = suspendCoroutineOrReturn { c ->
    thread {
        Thread.sleep(5_000)
        c.resume(Unit)
    }
    COROUTINE_SUSPENDED
}

这时候我们可以看到,before suspend 先被打印出来,5秒种后,after suspend 再被打印出来。

f3函数经过 CPS 转换、suspend 函数的尾调用优化以及 suspendCoroutineOrReturn 的内联,最终会变成如下的样子:

fun f3(c: Continuation<Unit>): Any? {
    thread {                //  ↑ 这个Any?其实是 ↓
        Thread.sleep(5_000) //       Unit | COROUTINE_SUSPENDED
        c.resume(Unit)
    }
    return COROUTINE_SUSPENDED
}

看起来就像是在直接操纵附加的 Continuation 参数。

本节比较长,给个小结:suspendCoroutineOrReturn 能够让你直接操纵通过CPS转换得来的 Continuation 参数,这个函数接收一个 lambda 参数。在这个 lambda 里面,你将面临两种选择:直接返回需要的结果(不挂起协程)或者返回 COROUTINE_SUSPENDED(挂起协程)。如果你选择挂起协程,你需要在合适的地方与时机调用(从lambda的参数得到的)Continuation 的 resume 方法将需要的结果传入以便继续执行协程。

Kotlin 标准库里面的 suspendCoroutine 即是对 suspendCoroutineOrReturn 的封装,使其更易于使用。读者可以对比一下两者的函数签名以及文档上的注意事项。一般情况下使用 suspendCoroutine 即可满足需求。

PS:不知道从什么时候开始的,kotlin.coroutines.experimental.intrinsics 这个包以及里面的所有东西,都被Intellij IDEA的自动补全屏蔽了。你需要手动import这个包,里面的东西才能出现在自动补全的列表里。

PPS:在 Kotlin1.2 版本中,suspendCoroutineOrReturn 不再是一个固有函数,其实现由两个新的固有函数组成,但是不影响使用。(拆东墙补西墙)

PPPS:本文完。

你学到了什么

  • Kotlin 协程的(不)常用的标准库函数的使用方法
  • 编译器都做了什么事(的一部分)
  • suspendCoroutineOrReturn有什么用

下一篇协程文?不存在的。

课后作业:Codewars上面的一道题,要求实现简单的控制流,很简单。如果你能看懂这篇文章,那么这道题对于你来说应该是十分简单的。(冰封julao仅用了不到十分钟的时间就做出来了)

反编译Tip:Intellij IDEA的Kotlin插件有将Kotlin代码反编译至Java的功能,但是在面对协程相关的代码时大多数情况下都不好用。请不要想太多,老老实实用其他反编译工具,我用的是这个jadx,版本0.6.1。

知识共享许可协议
本作品采用知识共享 署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可,转载请注明出处。

本文链接:https://aisia.moe/2018/02/08/kotlin-coroutine-kepa/