9.6 协程执行的取消
我们知道,启动函数launch返回一个Job引用当前协程,该Job引用可用于取消正在运行协程:
fun testCancellation() = runBlocking<Unit> {
val job = launch(CommonPool) {
repeat(1000) { i ->
println("I'm sleeping $i ... CurrentThread: ${Thread.currentThread()}")
delay(500L)
}
}
delay(1300L)
println("CurrentThread: ${Thread.currentThread()}")
println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel: $b1")
delay(1300L)
println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel: $b2")
println("main: Now I can quit.")
}
运行上面的代码,将会输出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Job is alive: true Job is completed: false
job cancel: true
Job is alive: false Job is completed: true
job cancel: false
main: Now I can quit.
我们可以看出,当job还在运行时,isAlive是true,isCompleted是false。当调用job.cancel取消该协程任务,cancel函数本身返回true, 此时协程的打印动作就停止了。此时,job的状态是isAlive是false,isCompleted是true。 如果,再次调用job.cancel函数,我们将会看到cancel函数返回的是false。
9.6.1 计算代码的协程取消失效
kotlin 协程的所有suspend 函数都是可以取消的。我们可以通过job的isActive状态来判断协程的状态,或者检查是否有抛出 CancellationException 时取消。
例如,协程正工作在循环计算中,并且不检查协程当前的状态, 那么调用cancel来取消协程将无法停止协程的运行, 如下面的示例所示:
fun testCooperativeCancellation1() = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
delay(3000L)
println("CurrentThread: ${Thread.currentThread()}")
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel1: $b1")
println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(30000L)
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel2: $b2")
println("main: Now I can quit.")
}
运行上面的代码,输出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
...
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
I'm sleeping 7 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
...
I'm sleeping 18 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 19 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
job cancel2: false
main: Now I can quit.
我们可以看出,即使我们调用了cancel函数,当前的job状态isAlive是false了,但是协程的代码依然一直在运行,并没有停止。
9.6.2 计算代码协程的有效取消
有两种方法可以使计算代码取消成功。
方法一: 显式检查取消状态isActive
我们直接给出实现的代码:
fun testCooperativeCancellation2() = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
if (!isActive) {
return@launch
}
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
delay(3000L)
println("CurrentThread: ${Thread.currentThread()}")
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel1: $b1")
println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(3000L)
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel2: $b2")
println("main: Now I can quit.")
}
运行这段代码,输出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
job cancel2: false
main: Now I can quit.
正如您所看到的, 现在这个循环可以被取消了。这里的isActive属性是CoroutineScope中的属性。这个接口的定义是:
public interface CoroutineScope {
public val isActive: Boolean
public val context: CoroutineContext
}
该接口用于通用协程构建器的接收器,以便协程中的代码可以方便的访问其isActive状态值(取消状态),以及其上下文CoroutineContext信息。
方法二: 循环调用一个挂起函数yield()
该方法实质上是通过job的isCompleted状态值来捕获CancellationException完成取消功能。
我们只需要在while循环体中循环调用yield()来检查该job的取消状态,如果已经被取消,那么isCompleted值将会是true,yield函数就直接抛出CancellationException异常,从而完成取消的功能:
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
yield()
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
运行上面的代码,输出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
job cancel2: false
main: Now I can quit.
如果我们想看看yield函数抛出的异常,我们可以加上try catch打印出日志:
try {
yield()
} catch (e: Exception) {
println("$i ${e.message}")
}
我们可以看到类似:Job was cancelled 这样的信息。
这个yield函数的实现是:
suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
val context = cont.context
val job = context[Job]
if (job != null && job.isCompleted) throw job.getCompletionException()
if (cont !is DispatchedContinuation<Unit>) return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
cont.dispatchYield(job, Unit)
COROUTINE_SUSPENDED
}
如果调用此挂起函数时,当前协程的Job已经完成 (isActive = false, isCompleted = true),当前协程将以CancellationException取消。
9.6.3 在finally中的协程代码
当我们取消一个协程任务时,如果有try {...} finally {...}
代码块,那么finally {…}中的代码会被正常执行完毕:
fun finallyCancelDemo() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(2000L)
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
job.cancel()
println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(2000L)
println("main: Now I can quit.")
}
运行这段代码,输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
Before cancel, Job is alive: true Job is completed: false
I'm running finally
After cancel, Job is alive: false Job is completed: true
main: Now I can quit.
我们可以看出,在调用cancel之后,就算当前协程任务Job已经结束了,finally{...}
中的代码依然被正常执行。
但是,如果我们在finally{...}
中放入挂起函数:
fun finallyCancelDemo() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
delay(1000L)
println("And I've delayed for 1 sec ?")
}
}
delay(2000L)
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
job.cancel()
println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(2000L)
println("main: Now I can quit.")
}
运行上述代码,我们将会发现只输出了一句:I’m running finally。因为主线程在挂起函数delay(1000L)
以及后面的打印逻辑还没执行完,就已经结束退出。
finally {
println("I'm running finally")
delay(1000L)
println("And I've delayed for 1 sec ?")
}
9.6.4 协程执行不可取消的代码块
如果我们想要上面的例子中的finally{...}
完整执行,不被取消函数操作所影响,我们可以使用 run 函数和 NonCancellable 上下文将相应的代码包装在 run (NonCancellable) {…} 中, 如下面的示例所示:
fun testNonCancellable() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
run(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(2000L)
println("main: I'm tired of waiting!")
job.cancel()
delay(2000L)
println("main: Now I can quit.")
}
运行输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
main: I'm tired of waiting!
I'm running finally
And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.