Twitter标准库
Twitter最重要的标准库是Util 和 Finagle。Util 可以理解为Scala和Java的标准库扩展,提供了标准库中没有的功能或已有功能的更合适的实现。Finagle 是我们的RPC系统,核心分布式系统组件。
Futures已经在并发一节中简单讨论过。它是调异步处理的中心机制,渗透在我们代码库中,也是Finagle的核心。Futures允许组合并发事件,简化了高并发操作。也是JVM上异步并发的一种高效的实现。
Twitter的future是异步的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的线程的执行;网络IO和磁盘IO是就是例子——必须由系统处理,它为结果提供future。Finagle为网络IO提供了这样一种系统。
Futures清晰简单:它们持有一个尚未完成运算结果的 promise 。它们是一个简单的容器——一个占位符。一次计算当然可能会失败,这种状况必须被编码:一个Future可以是三种状态之一: pending, failed, completed。
闲话: 组合(composition)
让我们重新审视我们所说的组合:将简单的组件合成一个更复杂的。函数组合的一个权威的例子:给定函数 f 和 g,组合函数 (g∘f)(x) = g(f(x)) ——结果先对 x使用f函数,然后在使用g函数——用Scala来写:
val f = (i: Int) => i.toString
val g = (s: String) => s+s+s
val h = g compose f // : Int => String
scala> h(123)
res0: java.lang.String = 123123123
复合函数h,是个新的函数,由之前定义的f和g函数合成。
Futures是一种集合类型——它是个包含0或1个元素的容器——你可以发现他们有标准的集合方法(eg:map, filter, foreach)。因为Future的值是延迟的,结果应用这些方法中的任何一种必然也延迟;在
val result: Future[Int]
val resultStr: Future[String] = result map { i => i.toString }
函数 { i => i.toString } 不会被调用,直到int值可用;转换集合的resultStr在可用之前也一直是待定状态。
List可以被扁平化(flattened):
val listOfList: List[List[Int]] = ..
val list: List[Int] = listOfList.flatten
这对future也是有意义的:
val futureOfFuture: Future[Future[Int]] = ..
val future: Future[Int] = futureOfFuture.flatten
因为future是延迟的,flatten的实现——立即返回——不得不返回一个等待外部future (Future[Future[Int]]
) 完成的future (Future[Future[Int]]
).如果外部future失败,内部flattened future也将失败。
Future (类似List) 也定义了flatMap;Future[A] 定义方法flatMap的签名
flatMap[B](f: A => Future[B]): Future[B]
如同组合 map 和 flatten,我们可以这样实现:
def flatMap[B](f: A => Future[B]): Future[B] = {
val mapped: Future[Future[B]] = this map f
val flattened: Future[B] = mapped.flatten
flattened
}
这是一种有威力的组合!使用flatMap我们可以定义一个 Future 作为两个Future序列的结果。第二个future 的计算基于第一个的结果。想象我们需要2次RPC调用来验证一个用户身份,我们可以用下面的方式组合操作:
def getUser(id: Int): Future[User]
def authenticate(user: User): Future[Boolean]
def isIdAuthed(id: Int): Future[Boolean] =
getUser(id) flatMap { user => authenticate(user) }
这种组合类型的一个额外的好处是错误处理是内置的:如果getUser(..)或authenticate(..)失败,future 从 isAuthred(..)返回时将会失败。这里我们没有额外的错误处理的代码。
风格
Future回调方法(respond, onSuccess, onFailure, ensure) 返回一个新的Future,并链接到调用者。这个Future被保证只有在它调用者完成后才完成,使用模式如下:
acquireResource()
future onSuccess { value =>
computeSomething(value)
} ensure {
freeResource()
}
freeResource() 被保证只有在 computeSomething之后才执行,这样就模拟了try-finally 模式。
使用 onSuccess替代 foreach —— 它与 onFailure 方法对称,命名的意图更明确,并且也允许 chaining。
永远避免直接创建Promise实例: 几乎每一个任务都可以通过使用预定义的组合子完成。这些组合子确保错误和取消是可传播的, 通常鼓励的数据流风格的编程,不再需要同步和volatility声明。
用尾递归风格编写的代码不再导致堆栈空间泄漏,并使得以数据流风格高效的实现循环成为可能:
case class Node(parent: Option[Node], ...)
def getNode(id: Int): Future[Node] = ...
def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] =
getNode(id) flatMap {
case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes)
case n => Future.value((n :: nodes).reverse)
}
Future定义很多有用的方法: 使用 Future.value() 和 Future.exception() 来创建未满意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了组合子将多个future合成一个(例如:scatter-gather操作的gather部分)。
Cancellation
Future实现了一种弱形式的取消。调用Future#cancel 不会直接终止运算,而是发送某个级别的可被任何处理查询的触发信号,最终满足这个future。Cancellation信号流向相反的方向:一个由消费者设置的cancellation信号,会传播到它的生产者。生产者使用 Promise的onCancellation来监听信号并执行相应的动作。
这意味这cancellation语意上依赖生产者,没有默认的实现。cancellation只是一个提示。
Local
Util的Local提供了一个位于特定的future派发树(dispatch tree)的引用单元(cell)。设定一个local的值,使这个值可以用于被同一个线程的Future 延迟的任何计算。有一些类似于thread locals(注:Java中的线程机制),不同的是它们的范围不是一个Java线程,而是一个 future 线程树。在
trait User {
def name: String
def incrCost(points: Int)
}
val user = new Local[User]
...
user() = currentUser
rpc() ensure {
user().incrCost(10)
}
在 ensure块中的 user() 将在回调被添加的时候引用 user local的值。
就thread locals来说,我们的Locals非常的方便,但要尽量避免使用:除非确信通过显式传递数据时问题不能被充分的解决,哪怕解决起来有些繁重。
Locals有效的被核心库使用在非常常见的问题上——线程通过RPC跟踪,传播监视器,为future的回调创建stack traces——任何其他解决方法都使得用户负担过度。Locals在几乎任何其他情况下都不适合。
Offer/Broker
并发系统由于需要协调访问数据和资源而变得复杂。Actor提出一种简化的策略:每一个actor是一个顺序的进程(process),保持自己的状态和资源,数据通过消息的方式与其它actor共享。 共享数据需要actor之间通信。
Offer/Broker 建立于Actor之上,以这三种重要的方式表现:1,通信通道(Brokers)是first class——即发送消息需要通过Brokers,而非直接到actor。2, Offer/Broker 是一种同步机制:通信会话是同步的。 这意味我们可以用 Broker做为协调机制:当进程a发送一条信息给进程b;a和b都要对系统状态达成一致。3, 最后,通信可以选择性地执行:一个进程可以提出几个不同的通信,其中的一个将被获取。
为了以一种通用的方式支持选择性通信(以及其他组合),我们需要将通信的描述和执行解耦。这正是Offer做的——它是一个持久数据用于描述一次通信;为了执行这个通信(offer执行),我们通过它的sync()方法同步
trait Offer[T] {
def sync(): Future[T]
}
返回 Future[T] 当通信被获取的时候生成交换值。
Broker通过offer协调值的交换——它是通信的通道:
trait Broker[T] {
def send(msg: T): Offer[Unit]
val recv: Offer[T]
}
所以,当创建两个offer
val b: Broker[Int]
val sendOf = b.send(1)
val recvOf = b.recv
sendOf和recvOf都同步
// In process 1:
sendOf.sync()
// In process 2:
recvOf.sync()
两个offer都获取并且值1被交换。
通过将多个offer和Offer.choose绑定来执行可选择通信。
def choose[T](ofs: Offer[T]*): Offer[T]
上面的代码生成一个新的offer,当同步时获取一个特定的ofs——第一个可用的。当多个都立即可用时,随机获取一个。
Offer对象有些一次性的Offers用于与来自Broker的Offer构建。
Offer.timeout(duration): Offer[Unit]
offer在给定时间后激活。Offer.never将用于不会有效,Offer.const(value)在给定值后立即有效。这些操作由选择性通信来组合是非常有用的。例如,在一个send操作中使用超时:
Offer.choose(
Offer.timeout(10.seconds),
broker.send("my value")
).sync()
人们可能会比较 Offer/Broker 与SynchronousQueue,他们有细微但非常重要的区别。Offer可以被组合,而queue不能。例如,考虑一组queues,描述为 Brokers:
val q0 = new Broker[Int]
val q1 = new Broker[Int]
val q2 = new Broker[Int]
现在让我们为读取创建一个合并的queue
val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)
anyq是一个将从第一个可用的queue中读取的offer。注意 anyq 仍是同步的——我们仍然拥有底层队列的语义。这类组合是不可能用queue实现的。
例子:一个简单的连接池
连接池在网络应用中很常见,并且它们的实现常常需要技巧——例如,在从池中获取一个连接的时候,通常需要超时机制,因为不同的客户端有不同的延迟需求。池的简单原则:维护一个连接队列,满足那些进入的等待者。使用传统的同步原语,这通常需要两个队列(queues):一个用于等待者(当没有连接可用时),一个用于连接(当没有等待者时)。
使用 Offer/Brokers ,可以表达得非常自然:
class Pool(conns: Seq[Conn]) {
private[this] val waiters = new Broker[Conn]
private[this] val returnConn = new Broker[Conn]
val get: Offer[Conn] = waiters.recv
def put(c: Conn) { returnConn ! c }
private[this] def loop(connq: Queue[Conn]) {
Offer.choose(
if (connq.isEmpty) Offer.never else {
val (head, rest) = connq.dequeue
waiters.send(head) { _ => loop(rest) }
},
returnConn.recv { c => loop(connq enqueue c) }
).sync()
}
loop(Queue.empty ++ conns)
}
loop总是提供一个归还的连接,但只有queue非空的时候才会send。 使用持久化队列(persistent queue)更进一步简化逻辑。与连接池的接口也是通过Offer实现,所以调用者如果愿意设置timeout,他们可以通过利用组合子(combinators)来做:
val conn: Future[Option[Conn]] = Offer.choose(
pool.get { conn => Some(conn) },
Offer.timeout(1.second) { _ => None }
).sync()
实现timeout不需要额外的记账(bookkeeping);这是因为Offer的语义:如果Offer.timeout被选择,不会再有offer从池中获得——连接池和它的调用者在各自waiter的broker上不必同时同意接受和发送。
埃拉托色尼筛子(Sieve of Eratosthenes 译注:一种用于筛选素数的算法)
把并发程序构造为一组顺序的同步通信进程,通常很有用——有时程序被大大地简化了。Offer和Broker提供了一组工具来让它简单并一致。确实,它们的应用超越了我们可能认为是经典并发性问题——并发编程(有Offer/Broker的辅助)是一种有用的构建工具,正如子例程(subroutines),类,和模块都是——来自CSP(译注:Communicating sequential processes的缩写,即通信顺序进程)的重要思想。
这里有一个埃拉托色尼筛子可以构造为一个针对一个整数流(stream of integers)的连续的应用过滤器 。首先,我们需要一个整数的源(source of integers):
def integers(from: Int): Offer[Int] = {
val b = new Broker[Int]
def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
gen(from)
b.recv
}
integers(n) 方法简单地提供了从n开始的所有连续的整数。然后我们需要一个过滤器:
def filter(in: Offer[Int], prime: Int): Offer[Int] = {
val b = new Broker[Int]
def loop() {
in.sync() onSuccess { i =>
if (i % prime != 0)
b.send(i).sync() ensure loop()
else
loop()
}
}
loop()
b.recv
}
filter(in, p) 方法返回的offer删除了in中的所有质数(prime)的倍数。最终我们定义了我们的筛子(sieve):
def sieve = {
val b = new Broker[Int]
def loop(of: Offer[Int]) {
for (prime <- of.sync(); _ <- b.send(prime).sync())
loop(filter(of, prime))
}
loop(integers(2))
b.recv
}
loop() 工作很简单:从of中读取下一个质数,然后对of应用过滤器排除这个质数。loop不断的递归,持续的质数被过滤,于是我们得到了筛选结果。我们现在打印前10000个质数:
val primes = sieve
0 until 10000 foreach { _ =>
println(primes.sync()())
}
除了构造简单,组件正交,这种做法也给你一种流式筛子(streaming sieve):你不需要事先计算出你感兴趣的质数集合,从而进一步提高了模块化。