9.10 通道
延迟对象提供了一种在协程之间传输单个值的方法。而通道(Channel)提供了一种传输数据流的方法。通道是使用 SendChannel 和使用 ReceiveChannel 之间的非阻塞通信。
9.10.1 通道 vs 阻塞队列
通道的概念类似于 阻塞队列(BlockingQueue)。在Java的Concurrent包中,BlockingQueue很好的解决了多线程中如何高效安全“传输”数据的问题。它有两个常用的方法如下:
E take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空, 阻塞进入等待状态直到BlockingQueue有新的数据被加入;
put(E e): 把对象 e 加到BlockingQueue里, 如果BlockQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续。
通道跟阻塞队列一个关键的区别是:通道有挂起的操作, 而不是阻塞的, 同时它可以关闭。
代码示例:
package com.easy.kotlin
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
class ChannelsDemo {
fun testChannel() = runBlocking<Unit> {
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..10) channel.send(x * x)
}
println("channel = ${channel}")
// here we print five received integers:
repeat(10) { println(channel.receive()) }
println("Done!")
}
}
fun main(args: Array<String>) {
val cd = ChannelsDemo()
cd.testChannel()
}
运行输出:
channel = kotlinx.coroutines.experimental.channels.RendezvousChannel@2e817b38
1
4
9
16
25
36
49
64
81
100
Done!
我们可以看出使用Channel<Int>()
背后调用的是会合通道RendezvousChannel()
,会合通道中没有任何缓冲区。send函数被挂起直到另外一个协程调用receive函数, 然后receive函数挂起直到另外一个协程调用send函数。它是一个完全无锁的实现。
9.10.2 关闭通道和迭代遍历元素
与队列不同, 通道可以关闭, 以指示没有更多的元素。在接收端, 可以使用 for 循环从通道接收元素。代码示例:
fun testClosingAndIterationChannels() = runBlocking {
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) channel.send(x * x)
channel.close() // 我们结束 sending
}
// 打印通道中的值,直到通道关闭
for (x in channel) println(x)
println("Done!")
}
其中, close函数在这个通道上发送一个特殊的 “关闭令牌”。这是一个幂等运算:对此函数的重复调用不起作用, 并返回 “false”。此函数执行后,isClosedForSend
返回 “true”。但是, ReceiveChannel
的isClosedForReceive
在所有之前发送的元素收到之后才返回 “true”。
我们把上面的代码加入打印日志:
fun testClosingAndIterationChannels() = runBlocking {
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) {
channel.send(x * x)
}
println("Before Close => isClosedForSend = ${channel.isClosedForSend}")
channel.close() // 我们结束 sending
println("After Close => isClosedForSend = ${channel.isClosedForSend}")
}
// 打印通道中的值,直到通道关闭
for (x in channel) {
println("${x} => isClosedForReceive = ${channel.isClosedForReceive}")
}
println("Done! => isClosedForReceive = ${channel.isClosedForReceive}")
}
运行输出:
1 => isClosedForReceive = false
4 => isClosedForReceive = false
9 => isClosedForReceive = false
16 => isClosedForReceive = false
25 => isClosedForReceive = false
Before Close => isClosedForSend = false
After Close => isClosedForSend = true
Done! => isClosedForReceive = true
9.10.3 生产者-消费者模式
使用协程生成元素序列的模式非常常见。这是在并发代码中经常有的生产者-消费者模式。代码示例:
fun produceSquares() = produce<Int>(CommonPool) {
for (x in 1..7) send(x * x)
}
fun consumeSquares() = runBlocking{
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
这里的produce函数定义如下:
public fun <E> produce(
context: CoroutineContext,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ProducerJob<E> {
val channel = Channel<E>(capacity)
return ProducerCoroutine(newCoroutineContext(context), channel).apply {
initParentJob(context[Job])
block.startCoroutine(this, this)
}
}
其中,参数说明如下:
参数名 | 说明 |
---|---|
context | 协程上下文 |
capacity | 通道缓存容量大小 (默认没有缓存) |
block | 协程代码块 |
produce函数会启动一个新的协程, 协程中发送数据到通道来生成数据流,并以 ProducerJob 对象返回对协程的引用。ProducerJob继承了Job, ReceiveChannel类型。