Futures
注:本节未经校验,如有问题欢迎提issue
简介
在 Akka 中,一个Future是用来获取某个并发操作结果的数据结构。这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。
执行上下文
为了运行回调和操作,Futures 需要有一个ExecutionContext
,它与java.util.concurrent.Executor
很相像. 如果你在作用域内有一个ActorSystem
,它会把自己的派发器用作ExecutionContext
,或者你也可以用ExecutionContext
伴生对象提供的工厂方法来将Executors
和ExecutorServices
进行包装,或者甚至创建自己的实例。
import scala.concurrent.{ ExecutionContext, Promise }
implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)
// Do stuff with your brand new shiny ExecutionContext
val f = Promise.successful("foo")
// Then shut your ExecutionContext down at some
// appropriate place in your program/application
ec.shutdown()
在Actor中
每个actor都被配置为在MessageDispatcher
上运行,且该调度器又被用作为ExecutionContext
。如果被actor调用的Future的性质匹配或兼容与那个actor的活动(例如,全CPU绑定,也没有延迟要求),那么它可能是最容易重用派发器,只需要通过导入context.dispatcher
来运行Futures。
class A extends Actor {
import context.dispatcher
val f = Future("hello")
def receive = {
// receive omitted ...
}
}
用于 Actor
通常有两种方法来从一个Actor
获取回应:第一种是发送一个消息(actor ! msg
,这种方法只在发送者是一个Actor
时有效),第二种是通过一个Future
。
使用Actor
的?
方法来发送消息会返回一个Future
. 要等待并获取结果的最简单方法是:
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]
这会导致当前线程阻塞,并等待Actor
通过它的应答来 ‘完成’Future
。但是阻塞会导致性能问题,所以是不推荐的. 导致阻塞的操作位于Await.result
和Await.ready
中,这样就方便定位阻塞的位置. 对阻塞方式的替代方法会在本文档中进一步讨论。还要注意Actor
返回的Future
的类型 是Future[Any]
,这是因为Actor
是动态的. 这也是为什么上例中使用了asInstanceOf
。在使用非阻塞方式时,最好使用mapTo
方法来将Future
转换到期望的类型:
import scala.concurrent.Future
import akka.pattern.ask
val future: Future[String] = ask(actor, msg).mapTo[String]
如果转换成功,mapTo
方法会返回一个包含结果的新的Future
,如果不成功,则返回ClassCastException
. 对Exception
的处理将在本文档进一步讨论。
要把Future
的结果发送给一个Actor
,你可以使用pipe
构建:
import akka.pattern.pipe
future pipeTo actor
直接使用
Akka中的一个常见用例是在不需要额外使用Actor
工具的情况下并发地执行计算. 如果你发现你只是为了并行地执行一个计算而创建了一堆Actor
,下面是一种更好(也更快)的方法:
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
val future = Future {
"Hello" + "World"
}
future foreach println
在上面的代码中,被传递给Future
的代码块会被缺省的Dispatcher
执行,代码块的返回结果会被用来完成Future
(在这个例子中,结果是一个字符串:“HelloWorld”). 与从Actor
返回的Future
不同,这个Future
拥有合适的类型,我们还避免了管理Actor
的开销。
你还可以用Future
伴生对象创建一个已经完成的Future
,它可以是成功的:
val future = Future.successful("Yay!")
或是失败的:
val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))
也可以创建一个空的Promise
,以后填充它,并包含一个相应的Future
:
val promise = Promise[String]()
val theFuture = promise.future
promise.success("hello")
函数式 Future
Scala 的Future
有一些 monadic 方法,与Scala集合所使用的方法非常相似. 这使你可以构造出可以传递结果的 ‘管道’ 或 ‘数据流’ 。
Future 是 Monad
让Future
以函数式风格工作的第一个方法是map
. 它需要一个Function
来对Future
的结果进行处理,返回一个新的结果。map
方法的返回值是包含新结果的另一个Future
:
val f1 = Future {
"Hello" + "World"
}
val f2 = f1 map { x =>
x.length
}
f2 foreach println
这个例子中我们在Future
内部连接两个字符串。我们没有等待这个Future
结束,而是使用map
方法来将计算字符串长度的函数应用于它. 现在我们有了第二个Future
,它的最终结果是一个Int
. 当先前的Future
完成时,它会应用我们的函数并用其结果来完成第二个Future
。最终我们得到的结果是 10. 先前的Future
仍然持有字符串“HelloWorld”,而不受map
的影响。
如果我们只是修改一个Future
,map
方法就够用了。但如果有2个以上Future
时,map
无法将他们组合到一起:
val f1 = Future {
"Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 map { x =>
f2 map { y =>
x.length * y
}
}
f3 foreach println
f3
的类型是Future[Future[Int]]
而不是我们所期望的Future[Int]
. 这时我们需要使用flatMap
方法:
val f1 = Future {
"Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 flatMap { x =>
f2 map { y =>
x.length * y
}
}
f3 foreach println
使用嵌套的map
或flatmap
组合子来组合Future
,有时会变得非常复杂和难以阅读,这时使用 Scala 的 ‘for comprehensions’ 一般会生成可读性更好的代码。见下一部分的示例。
如果你需要进行条件筛选外延,可以使用filter
:
val future1 = Future.successful(4)
val future2 = future1.filter(_ % 2 == 0)
future2 foreach println
val failedFilter = future1.filter(_ % 2 == 1).recover {
// When filter fails, it will have a java.util.NoSuchElementException
case m: NoSuchElementException => 0
}
failedFilter foreach println
For Comprehensions
由于Future
拥有map
,filter
和flatMap
方法,它可以方便地用于 ‘for comprehension’:
val f = for {
a <- Future(10 / 2) // 10 / 2 = 5
b <- Future(a + 1) // 5 + 1 = 6
c <- Future(a - 1) // 5 - 1 = 4
if c > 3 // Future.filter
} yield b * c // 6 * 4 = 24
// Note that the execution of futures a, b, and c
// are not done in parallel.
f foreach println
这样写代码的时候需要记住的是:虽然看上去上例的部分代码可以并发地运行,for comprehension的每一步实际是顺序执行的。每一步是在单独的线程中运行的,但是相较于将所有的计算在一个单独的Future
中运行并没有太大好处。只有先创建Future
,然后对其进行组合的情况下才能真正得到好处。
组合 Futures
上例中的for comprehension 是对Future
进行组合的例子. 这种方法的常见用例是将多个Actor
的回应组合成一个单独的计算而不用调用Await.result
或Await.ready
来阻塞地获得每一个结果. 先看看使用Await.result
的例子:
val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)
val a = Await.result(f1, 3 seconds).asInstanceOf[Int]
val b = Await.result(f2, 3 seconds).asInstanceOf[Int]
val f3 = ask(actor3, (a + b))
val result = Await.result(f3, 3 seconds).asInstanceOf[Int]
警告
Await.result
和Await.ready
为必须阻塞的特殊情况提供的,一个很好的经验法则是仅在你知道为什么你必须阻塞的情况下使用它们。对于所有其他情况,使用如下所述的异步组合。
这里我们等待前2个Actor
的结果然后将其发送给第三个Actor
. 我们调用了3次Await.result
,导致我们的程序在获得最终结果前阻塞了3次。现在跟下例比较:
val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)
val f3 = for {
a <- f1.mapTo[Int]
b <- f2.mapTo[Int]
c <- ask(actor3, (a + b)).mapTo[Int]
} yield c
f3 foreach println
这里我们有两个actor各自处理自己的一条消息。一旦这2个结果可用了(注意我们并没有阻塞地等待这些结果!),它们会被加起来发送给第三个Actor
,这第三个actor回应一个字符串,我们把它赋值给 ‘result’。
上面的方法对已知给定Actor数量的时候就足够了,但是当Actor数量较大时就显得比较笨重。sequence
和traverse
两个辅助方法可以帮助处理更复杂的情况。这两个方法都是用来将T[Future[A]]
转换为Future[T[A]]
(其中T
是Traversable
子类). 例如:
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int])
// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)
// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum)
oddSum foreach println
现在来解释一下,Future.sequence
将输入的List[Future[Int]]
转换为Future[List[Int]]
. 这样我们就可以将map
直接作用于List[Int]
,从而得到List
的总和。
traverse
方法与sequence
类似,但它以T[A]
和A => Future[B]
函数为参数返回一个Future[T[B]]
,这里的T
同样也是Traversable
的子类. 例如,用traverse
来计算前100个奇数的和:
val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
val oddSum = futureList.map(_.sum)
oddSum foreach println
其结果与这个例子是一样的:
val futureList = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1)))
val oddSum = futureList.map(_.sum)
oddSum foreach println
但是用traverse
也许会快一些,因为它不用创建一个List[Future[Int]]
的临时变量。
然后我们有一个方法fold
,它的参数包括一个初始值 ,一个Future
序列和一个作用于初始值和Future
类型返回与初始值相同类型的函数,它将这个函数异步地应用于future序列的所有元素,它的执行将在最后一个Future完成之后开始。
// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.fold(futures)(0)(_ + _)
futureSum foreach println
就是这么简单!
如果传给fold
的序列是空的,它将返回初始值,在上例中,这个值是0. 有时你没有一个初始值,而使用序列中第一个已完成的Future
的值作为初始值,你可以使用reduce
,它的用法是这样的:
// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.reduce(futures)(_ + _)
futureSum foreach println
与fold
一样,它是在最后一个Future
完成后异步执行的,你也可以对这个过程进行并行化:将future分成子序列分别进行reduce,然后对reduce的结果再次reduce。
回调
有时你只想要监听Future
的完成事件,对其进行响应,不是创建新的Future
,而仅仅是产生副作用. Scala为这种情况准备了onComplete
,onSuccess
和onFailure
,其中后两者是第一项的特例。
future onSuccess {
case "bar" => println("Got my bar alright!")
case x: String => println("Got some random string: " + x)
}
future onFailure {
case ise: IllegalStateException if ise.getMessage == "OHNOES" =>
//OHNOES! We are in deep trouble, do something!
case e: Exception =>
//Do something else
}
future onComplete {
case Success(result) => doSomethingOnSuccess(result)
case Failure(failure) => doSomethingOnFailure(failure)
}
定义次序
由于回调的执行是无序的,而且可能是并发执行的,当你需要操作有序的时候代码行为往往很怪异。但有一个解决办法是使用andThen
. 它会为指定的回调创建一个新的Future
,这个Future
与原先的Future
拥有相同的结果,这样就可以像下例一样定义次序:
val result = Future { loadPage(url) } andThen {
case Failure(exception) => log(exception)
} andThen {
case _ => watchSomeTV()
}
result foreach println
辅助方法
Future````fallbackTo
将两个Futures
合并成一个新的Future
,如果第一个Future
失败了,它将持有第二个Future
的成功值。
val future4 = future1 fallbackTo future2 fallbackTo future3
future4 foreach println
你也可以使用zip
操作将两个Futures
组合成一个新的持有二者成功结果的tuple元组的Future
val future3 = future1 zip future2 map { case (a, b) => a + " " + b }
future3 foreach println
异常
由于Future
的结果是与程序的其它部分并发生成的,因此异常需要作特殊的处理。不管是Actor
或是派发器正在完成此Future
,如果抛出了Exception
,Future
将持有这个异常而不是一个有效的值. 如果Future
持有Exception
,调用Await.result
将导致此异常被再次抛出从而得到正确的处理。
通过返回一个不同的结果来处理Exception
也是可能的. 这是使用recover
方法实现的. 例如:
val future = akka.pattern.ask(actor, msg1) recover {
case e: ArithmeticException => 0
}
future foreach println
在这个例子中,如果actor回应了包含ArithmeticException
的akka.actor.Status.Failure
,我们的Future
将持有 0 作为结果.recover
方法与标准的 try/catch 块非常相似,可以用这种方式处理多种Exception
, 如果其中有没有提到的Exception
,这种异常将以“好像没有定义recover
一样”的的方式来处理。
你也可以使用recoverWith
方法,它和recover
的关系就象flatMap
与map
的关系,用法如下:
val future = akka.pattern.ask(actor, msg1) recoverWith {
case e: ArithmeticException => Future.successful(0)
case foo: IllegalArgumentException =>
Future.failed[Int](new IllegalStateException("All br0ken!"))
}
future foreach println
After
akka.pattern.after
使得在给定超时后完成一个Future
,获取其值或异常比昂的很容易。
// TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759
// import akka.pattern.after
val delayed = akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(
new IllegalStateException("OHNOES")))
val future = Future { Thread.sleep(1000); "foo" }
val result = Future firstCompletedOf Seq(future, delayed)