类型 Future

作为一个对 Scala 充满热情的开发者,你应该已经听说过 Scala 处理并发的能力,或许你就是被这个吸引来的。
相较于大多数编程语言低级的并发 API,Scala 提供的方法可以让人们更好的理解并发以及编写良构的并发程序。

本章的主题- Future 就是这种方法的两大基石之一。(另一个是 actor)
我会解释 Future 的优点,以及它的函数式特征。

如果你想动手试试接下来的例子,请确保 Scala 版本不低于 2.9.3,
Future 在 2.10.0 版本中引入,并向后兼容到 2.9.3,最初,它是 Akka 库的一部分(API略有不同)。

顺序代码为什么会变坏

假设你想准备一杯卡布奇诺,你可以一个接一个的执行以下步骤:

  1. 研磨所需的咖啡豆
  2. 加热一些水
  3. 用研磨好的咖啡豆和热水制做一杯咖啡
  4. 打奶泡
  5. 结合咖啡和奶泡做成卡布奇诺

转换成 Scala 代码,可能会是这样:

  1. import scala.util.Try
  2. // Some type aliases, just for getting more meaningful method signatures:
  3. type CoffeeBeans = String
  4. type GroundCoffee = String
  5. case class Water(temperature: Int)
  6. type Milk = String
  7. type FrothedMilk = String
  8. type Espresso = String
  9. type Cappuccino = String
  10. // dummy implementations of the individual steps:
  11. def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
  12. def heatWater(water: Water): Water ` water.copy(temperature ` 85)
  13. def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
  14. def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
  15. def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"
  16. // some exceptions for things that might go wrong in the individual steps
  17. // (we'll need some of them later, use the others when experimenting with the code):
  18. case class GrindingException(msg: String) extends Exception(msg)
  19. case class FrothingException(msg: String) extends Exception(msg)
  20. case class WaterBoilingException(msg: String) extends Exception(msg)
  21. case class BrewingException(msg: String) extends Exception(msg)
  22. // going through these steps sequentially:
  23. def prepareCappuccino(): Try[Cappuccino] = for {
  24. ground <- Try(grind("arabica beans"))
  25. water <- Try(heatWater(Water(25)))
  26. espresso <- Try(brew(ground, water))
  27. foam <- Try(frothMilk("milk"))
  28. } yield combine(espresso, foam)

这样做有几个优点:
可以很轻易的弄清楚事情的步骤,一目了然,而且不会混淆。(毕竟没有上下文切换)
不好的一面是,大部分时间,你的大脑和身体都处于等待的状态:
在等待研磨咖啡豆时,你完全不能做任何事情,只有当这一步完成后,你才能开始烧水。
这显然是在浪费时间,所以你可能想一次开始多个步骤,让它们同时执行,
一旦水烧开,咖啡豆也磨好了,你可以制做咖啡了,这期间,打奶泡也可以开始了。

这和编写软件没什么不同。
一个 Web 服务器可以用来处理和响应请求的线程只有那么多,
不能因为要等待数据库查询或其他 HTTP 服务调用的结果而阻塞了这些可贵的线程。
相反,一个异步编程模型和非阻塞 IO 会更合适,
这样的话,当一个请求处理在等待数据库查询结果时,处理这个请求的线程也能够为其他请求服务。

“I heard you like callbacks, so I put a callback in your callback!”

在并发家族里,你应该已经知道 nodejs 这个很酷的家伙,nodejs 完全通过回调来通信,
不幸的是,这很容易导致回调中包含回调的回调,这简直是一团糟,代码难以阅读和调试。

Scala 的 Future 也允许回调,但它提供了更好的选择,所以你不怎么需要它。

“I know Futures, and they are completely useless!”

也许你知道些其他的 Future 实现,最引人注目的是 Java 提供的那个。
但是对于 Java 的 Future,你只能去查看它是否已经完成,或者阻塞线程直到其结束。
简而言之,Java 的 Future 几乎没有用,而且用起来绝对不会让人开心。

如果你认为 Scala 的 Future 也是这样,那大错特错了!

Future 语义

scala.concurrent 包里的 Future[T] 是一个容器类型,代表一种返回值类型为 T 的计算。
计算可能会出错,也可能会超时;从而,当一个 future 完成时,它可能会包含异常,而不是你期望的那个值。

Future 只能写一次: 当一个 future 完成后,它就不能再被改变了。
同时,Future 只提供了读取计算值的接口,写入计算值的任务交给了 Promise,这样,API 层面上会有一个清晰的界限。
这篇文章里,我们主要关注前者,下一章会介绍 Promise 的使用。

使用 Future

Future 有多种使用方式,我将通过重写 “卡布奇诺” 这个例子来说明。

首先,所有可以并行执行的函数,应该返回一个 Future:

  1. import scala.concurrent.future
  2. import scala.concurrent.Future
  3. import scala.concurrent.ExecutionContext.Implicits.global
  4. import scala.concurrent.duration._
  5. import scala.util.Random
  6. def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
  7. println("start grinding...")
  8. Thread.sleep(Random.nextInt(2000))
  9. if (beans == "baked beans") throw GrindingException("are you joking?")
  10. println("finished grinding...")
  11. s"ground coffee of $beans"
  12. }
  13. def heatWater(water: Water): Future[Water] = Future {
  14. println("heating the water now")
  15. Thread.sleep(Random.nextInt(2000))
  16. println("hot, it's hot!")
  17. water.copy(temperature = 85)
  18. }
  19. def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
  20. println("milk frothing system engaged!")
  21. Thread.sleep(Random.nextInt(2000))
  22. println("shutting down milk frothing system")
  23. s"frothed $milk"
  24. }
  25. def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
  26. println("happy brewing :)")
  27. Thread.sleep(Random.nextInt(2000))
  28. println("it's brewed!")
  29. "espresso"
  30. }

上面的代码有几处需要解释。

首先是 Future 伴生对象里的 apply 方法需要两个参数:

  1. object Future {
  2. def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
  3. }

要异步执行的计算通过传名参数 body 传入。
第二个参数是一个隐式参数,隐式参数是说,函数调用时,如果作用域中存在一个匹配的隐式值,就无需显示指定这个参数。
ExecutionContext 可以执行一个 Future,可以把它看作是一个线程池,是绝大部分 Future API 的隐式参数。

import scala.concurrent.ExecutionContext.Implicits.global 语句引入了一个全局的执行上下文,确保了隐式值的存在。
这时候,只需要一个单元素列表,可以用大括号来代替小括号。
调用 future 方法时,经常使用这种形式,使得它看起来像是一种语言特性,而不是一个普通方法的调用。

这个例子没有大量计算,所以用随机休眠来模拟以说明问题,
而且,为了更清晰的说明并发代码的执行顺序,还在“计算”之前和之后打印了些东西。

计算会在 Future 创建后的某个不确定时间点上由 ExecutionContext 给其分配的某个线程中执行。

回调

对于一些简单的问题,使用回调就能很好解决。
Future 的回调是偏函数,你可以把回调传递给 Future 的 onSuccess 方法,
如果这个 Future 成功完成,这个回调就会执行,并把 Future 的返回值作为参数输入:

  1. grind("arabica beans").onSuccess { case ground =>
  2. println("okay, got my ground coffee")
  3. }

类似的,也可以在 onFailure 上注册回调,只不过它是在 Future 失败时调用,其输入是一个 Throwable

通常的做法是将两个回调结合在一起以更好的处理 Future:在 onComplete 方法上注册回调,回调的输入是一个 Try。

  1. import scala.util.{Success, Failure}
  2. grind("baked beans").onComplete {
  3. case Success(ground) => println(s"got my $ground")
  4. case Failure(ex) => println("This grinder needs a replacement, seriously!")
  5. }

传递给 grind 的是 “baked beans”,因此 grind 方法会产生异常,进而导致 Future 中的计算失败。

Future 组合

当嵌套使用 Future 时,回调就变得比较烦人。
不过,你也没必要这么做,因为 Future 是可组合的,这是它真正发挥威力的时候!

你一定已经注意到,之前讨论过的所有容器类型都可以进行 mapflatMap 操作,也可以用在 for 语句中。
作为一种容器类型,Future 支持这些操作也不足为奇!

真正的问题是,在还没有完成的计算上执行这些操作意味这什么,如何去理解它们?

Map 操作

Scala 让 “时间旅行” 成为可能!
假设想在水加热后就去检查它的温度,
可以通过将 Future[Water] 映射到 Future[Boolean] 来完成这件事情:

  1. val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
  2. println("we're in the future!")
  3. (80 to 85) contains (water.temperature)
  4. }

tempreatureOkay 最终会包含水温的结果。
你可以去改变 heatWater 的实现来让它抛出异常(比如说,加热器爆炸了),
然后等待 “we’re in the future!” 出现在显示屏上,不过你永远等不到。

写传递给 map 的函数时,你就处在未来(或者说可能的未来)。
一旦 Future[Water] 实例成功完成,这个函数就会执行,只不过,该函数所在的时间线可能不是你现在所处的这个。
如果 Future[Water] 失败,传递给 map 的函数中的事情永远不会发生,调用 map 的结果将是一个失败的 Future[Boolean]

FlatMap 操作

如果一个 Future 的计算依赖于另一个 Future 的结果,那需要求救于 flatMap 以避免 Future 的嵌套。

假设,测量水温的线程需要一些时间,那你可能想异步的去检查水温是否 OK。
比如,有一个函数,接受一个 Water ,并返回 Future[Boolean]

  1. def temperatureOkay(water: Water): Future[Boolean] = future {
  2. (80 to 85) contains (water.temperature)

使用 flatMap(而不是 map)得到一个 Future[Boolean],而不是 Future[Future[Boolean]]

  1. val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
  2. water => temperatureOkay(water)
  3. }
  4. val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
  5. water => temperatureOkay(water)
  6. }

同样,映射只会发生在 Future[Water] 成功完成情况下。

for 语句

除了调用 flatMap ,也可以写成 for 语句。上面的例子可以重写成:

  1. val acceptable: Future[Boolean] = for {
  2. heatedWater <- heatWater(Water(25))
  3. okay <- temperatureOkay(heatedWater)
  4. } yield okay

如果有多个可以并行执行的计算,则需要特别注意,要先在 for 语句外面创建好对应的 Futures。

  1. def prepareCappuccinoSequentially(): Future[Cappuccino] =
  2. for {
  3. ground <- grind("arabica beans")
  4. water <- heatWater(Water(25))
  5. foam <- frothMilk("milk")
  6. espresso <- brew(ground, water)
  7. } yield combine(espresso, foam)

这看起来很漂亮,但要知道,for 语句只不过是 flatMap 嵌套调用的语法糖。
这意味着,只有当 Future[GroundCoffee] 成功完成后, heatWater 才会创建 Future[Water]
你可以查看函数运行时打印出来的东西来验证这个说法。

因此,要确保在 for 语句之前实例化所有相互独立的 Futures:

  1. def prepareCappuccino(): Future[Cappuccino] = {
  2. val groundCoffee = grind("arabica beans")
  3. val heatedWater = heatWater(Water(20))
  4. val frothedMilk = frothMilk("milk")
  5. for {
  6. ground <- groundCoffee
  7. water <- heatedWater
  8. foam <- frothedMilk
  9. espresso <- brew(ground, water)
  10. } yield combine(espresso, foam)
  11. }

在 for 语句之前,三个 Future 在创建之后就开始各自独立的运行,显示屏的输出是不确定的。
唯一能确定的是 “happy brewing” 总是出现在后面,
因为该输出所在的函数 brew 是在其他两个函数执行完毕后才开始执行的。
也因为此,可以在 for 语句里面直接调用它,当然,前提是前面的 Future 都成功完成。

失败偏向的 Future

你可能会发现 Future[T] 是成功偏向的,允许你使用 mapflatMapfilter 等。

但是,有时候可能处理事情出错的情况。
调用 Future[T] 上的 failed 方法,会得到一个失败偏向的 Future,类型是 Future[Throwable]
之后就可以映射这个 Future[Throwable],在失败的情况下执行 mapping 函数。

总结

你已经见过 Future 了,而且它的前途看起来很光明!
因为它是一个可组合、可函数式使用的容器类型,这让我们的工作变得异常舒服。

调用 future 方法可以轻易将阻塞执行的代码变成并发执行,但是,代码最好原本就是非阻塞的。
为了实现它,我们还需要 Promise 来完成 Future,这就是下一章的主题。