MapReduce之后
虽然MapReduce在二十世纪二十年代后期变得非常流行,并受到大量的炒作,但它只是分布式系统的许多可能的编程模型之一。对于不同的数据量,数据结构和处理类型,其他工具可能更适合表示计算。
不管如何,我们在这一章花了大把时间来讨论MapReduce,因为它是一种有用的学习工具,它是分布式文件系统的一种相当简单明晰的抽象。在这里,简单意味着我们能理解它在做什么,而不是意味着使用它很简单。恰恰相反:使用原始的MapReduce API来实现复杂的处理工作实际上是非常困难和费力的 —— 例如,任意一种连接算法都需要你从头开始实现【37】。
针对直接使用MapReduce的困难,在MapReduce上有很多高级编程模型(Pig,Hive,Cascading,Crunch)被创造出来,作为建立在MapReduce之上的抽象。如果你了解MapReduce的原理,那么它们学起来相当简单。而且它们的高级结构能显著简化许多常见批处理任务的实现。
但是,MapReduce执行模型本身也存在一些问题,这些问题并没有通过增加另一个抽象层次而解决,而对于某些类型的处理,它表现得非常差劲。一方面,MapReduce非常稳健:你可以使用它在任务会频繁终止的多租户系统上处理几乎任意大量级的数据,并且仍然可以完成工作(虽然速度很慢)。另一方面,对于某些类型的处理而言,其他工具有时会快上几个数量级。
在本章的其余部分中,我们将介绍一些批处理方法。在第11章我们将转向流处理,它可以看作是加速批处理的另一种方法。
物化中间状态
如前所述,每个MapReduce作业都独立于其他任何作业。作业与世界其他地方的主要连接点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为第一个作业输出目录,且外部工作流调度程序必须在第一个作业完成后再启动第二个。
如果第一个作业的输出是要在组织内广泛发布的数据集,则这种配置是合理的。在这种情况下,你需要通过名称引用它,并将其重用为多个不同作业的输入(包括由其他团队开发的作业)。将数据发布到分布式文件系统中众所周知的位置能够带来松耦合,这样作业就不需要知道是谁在提供输入或谁在消费输出(参阅“逻辑与布线相分离”)。
但在很多情况下,你知道一个作业的输出只能用作另一个作业的输入,这些作业由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的中间状态(intermediate state):一种将数据从一个作业传递到下一个作业的方式。在一个用于构建推荐系统的,由50或100个MapReduce作业组成的复杂工作流中,存在着很多这样的中间状态【29】。
将这个中间状态写入文件的过程称为物化(materialization)。 (在“聚合:数据立方体和物化视图”中已经在物化视图的背景中遇到过这个术语。它意味着对某个操作的结果立即求值并写出来,而不是在请求时按需计算)
作为对照,本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输入连接起来。管道并没有完全物化中间状态,而是只使用一个小的内存缓冲区,将输出增量地流(stream)向输入。
与Unix管道相比,MapReduce完全物化中间状态的方法存在不足之处:
- MapReduce作业只有在前驱作业(生成其输入)中的所有任务都完成时才能启动,而由Unix管道连接的进程会同时启动,输出一旦生成就会被消费。不同机器上的数据倾斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
- Mapper通常是多余的:它们仅仅是读取刚刚由Reducer写入的同样文件,为下一个阶段的分区和排序做准备。在许多情况下,Mapper代码可能是前驱Reducer的一部分:如果Reducer和Mapper的输出有着相同的分区与排序方式,那么Reducer就可以直接串在一起,而不用与Mapper相互交织。
- 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这些临时数据这么搞就比较过分了。
数据流引擎
了解决MapReduce的这些问题,几种用于分布式批处理的新执行引擎被开发出来,其中最著名的是Spark 【61,62】,Tez 【63,64】和Flink 【65,66】。它们的设计方式有很多区别,但有一个共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。
由于它们将工作流显式建模为 数据从几个处理阶段穿过,所以这些系统被称为数据流引擎(dataflow engines)。像MapReduce一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。
与MapReduce不同,这些功能不需要严格扮演交织的Map与Reduce的角色,而是可以以更灵活的方式进行组合。我们称这些函数为算子(operators),数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:
- 一种选项是对记录按键重新分区并排序,就像在MapReduce的混洗阶段一样(参阅“分布式执行MapReduce”)。这种功能可以用于实现排序合并连接和分组,就像在MapReduce中一样。
- 另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区散列连接的工作,因为构建散列表还是会把顺序随机打乱。
- 对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区。
这种类型的处理引擎是基于像Dryad 【67】和Nephele 【68】这样的研究系统,与MapReduce模型相比,它有几个优点:
- 排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个Map和Reduce阶段之间出现。
- 没有不必要的Map任务,因为Mapper所做的工作通常可以合并到前面的Reduce算子中(因为Mapper不会更改数据集的分区)。
- 由于工作流中的所有连接和数据依赖都是显式声明的,因此调度程序能够总览全局,知道哪里需要哪些数据,因而能够利用局部性进行优化。例如,它可以尝试将消费某些数据的任务放在与生成这些数据的任务相同的机器上,从而数据可以通过共享内存缓冲区传输,而不必通过网络复制。
- 通常,算子间的中间状态足以保存在内存中或写入本地磁盘,这比写入HDFS需要更少的I/O(必须将其复制到多台机器,并将每个副本写入磁盘)。 MapReduce已经对Mapper的输出做了这种优化,但数据流引擎将这种思想推广至所有的中间状态。
- 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。
- 与MapReduce(为每个任务启动一个新的JVM)相比,现有Java虚拟机(JVM)进程可以重用来运行新算子,从而减少启动开销。
你可以使用数据流引擎执行与MapReduce工作流同样的计算,而且由于此处所述的优化,通常执行速度要明显快得多。既然算子是Map和Reduce的泛化,那么相同的处理代码就可以在任一执行引擎上运行:Pig,Hive或Cascading中实现的工作流可以无需修改代码,可以通过修改配置,简单地从MapReduce切换到Tez或Spark【64】。
Tez是一个相当薄的库,它依赖于YARN shuffle服务来实现节点间数据的实际复制【58】,而Spark和Flink则是包含了独立网络通信层,调度器,及用户向API的大型框架。我们将简要讨论这些高级API。
容错
完全物化中间状态至分布式文件系统的一个优点是,它具有持久性,这使得MapReduce中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。
Spark,Flink和Tez避免将中间状态写入HDFS,因此它们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则它会从其他仍然可用的数据重新计算(在可行的情况下是先前的中间状态,要么就只能是原始输入数据,通常在HDFS上)。
为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用了哪些输入分区?应用了哪些算子? Spark使用弹性分布式数据集(RDD)的抽象来跟踪数据的谱系【61】,而Flink对算子状态存档,允许恢复运行在执行过程中遇到错误的算子【66】。
在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,算子是否始终产生相同的输出?如果一些丢失的数据已经发送给下游算子,这个问题就很重要。如果算子重新启动,重新计算的数据与原有的丢失数据不一致,下游算子很难解决新旧数据之间的矛盾。对于不确定性算子来说,解决方案通常是杀死下游算子,然后再重跑新数据。
为了避免这种级联故障,最好让算子具有确定性。但需要注意的是,非确定性行为很容易悄悄溜进来:例如,许多编程语言在迭代哈希表的元素时不能对顺序作出保证,许多概率和统计算法显式依赖于使用随机数,以及用到系统时钟或外部数据源,这些都是都不确定性的行为。为了能可靠地从故障中恢复,需要消除这种不确定性因素,例如使用固定的种子生成伪随机数。
通过重算数据来从故障中恢复并不总是正确的答案:如果中间状态数据要比源数据小得多,或者如果计算量非常大,那么将中间数据物化为文件可能要比重新计算廉价的多。
关于物化的讨论
回到Unix的类比,我们看到,MapReduce就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是Unix管道。尤其是Flink是基于管道执行的思想而建立的:也就是说,将算子的输出增量地传递给其他算子,不待输入完成便开始处理。
排序算子不可避免地需要消费全部的输入后才能生成任何输出,因为输入中最后一条输入记录可能具有最小的键,因此需要作为第一条记录输出。因此,任何需要排序的算子都需要至少暂时地累积状态。但是工作流的许多其他部分可以以流水线方式执行。
当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它—— 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS上的物化数据集通常仍是作业的输入和最终输出。和MapReduce一样,输入是不可变的,输出被完全替换。比起MapReduce的改进是,你不用再自己去将中间状态写入文件系统了。
图与迭代处理
在“图数据模型”中,我们讨论了使用图来建模数据,并使用图查询语言来遍历图中的边与点。第2章的讨论集中在OLTP风格的应用场景:快速执行查询来查找少量符合特定条件的顶点。
批处理上下文中的图也很有趣,其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是PageRank 【69】,它试图根据链接到某个网页的其他网页来估计该网页的流行度。它作为配方的一部分,用于确定网络搜索引擎呈现结果的顺序
像Spark,Flink和Tez这样的数据流引擎(参见“中间状态的物化”)通常将算子作为有向无环图(DAG)的一部分安排在作业中。这与图处理不一样:在数据流引擎中,从一个算子到另一个算子的数据流被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。又一个不幸的命名混乱!
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止 —— 例如,直到没有更多的边要跟进,或直到一些指标收敛。我们在图2-6中看到一个例子,它通过重复跟进标明地点归属关系的边,生成了数据库中北美包含的所有地点列表(这种算法被称为闭包传递(transitive closure))。
可以在分布式文件系统中存储图(包含顶点和边的列表的文件),但是这种“重复至完成”的想法不能用普通的MapReduce来表示,因为它只扫过一趟数据。这种算法因此经常以迭代的风格实现:
- 外部调度程序运行批处理来计算算法的一个步骤。
- 当批处理过程完成时,调度器检查它是否完成(基于完成条件 —— 例如,没有更多的边要跟进,或者与上次迭代相比的变化低于某个阈值)。
- 如果尚未完成,则调度程序返回到步骤1并运行另一轮批处理。
这种方法是有效的,但是用MapReduce实现它往往非常低效,因为MapReduce没有考虑算法的迭代性质:它总是读取整个输入数据集并产生一个全新的输出数据集,即使与上次迭代相比,改变的仅仅是图中的一小部分。
Pregel处理模型
针对图批处理的优化 —— 批量同步并行(BSP)计算模型【70】已经开始流行起来。其中,Apache Giraph 【37】,Spark的GraphX API和Flink的Gelly API 【71】实现了它。它也被称为Pregel模型,因为Google的Pregel论文推广了这种处理图的方法【72】。
回想一下在MapReduce中,Mapper在概念上向Reducer的特定调用“发送消息”,因为框架将所有具有相同键的Mapper输出集中在一起。 Pregel背后有一个类似的想法:一个顶点可以向另一个顶点“发送消息”,通常这些消息是沿着图的边发送的。
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用Reducer一样。与MapReduce的不同之处在于,在Pregel模型中,顶点在一次迭代到下一次迭代的过程中会记住它的状态,所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息,那里就不需要做任何工作。
这与Actor模型有些相似(参阅“分布式的Actor框架”),除了顶点状态和顶点之间的消息具有容错性和耐久性,且通信以固定的方式进行:在每次迭代中,框架递送上次迭代中发送的所有消息。Actor通常没有这样的时间保证。
容错
顶点只能通过消息传递进行通信(而不是直接相互查询)的事实有助于提高Pregel作业的性能,因为消息可以成批处理,且等待通信的次数也减少了。唯一的等待是在迭代之间:由于Pregel模型保证所有在一轮迭代中发送的消息都在下轮迭代中送达,所以在下一轮迭代开始前,先前的迭代必须完全完成,而所有的消息必须在网络上完成复制。
即使底层网络可能丢失,重复或任意延迟消息(参阅“不可靠的网络”),Pregel的实现能保证在后续迭代中消息在其目标顶点恰好处理一次。像MapReduce一样,框架能从故障中透明地恢复,以简化在Pregel上实现算法的编程模型。
这种容错是通过在迭代结束时,定期存档所有顶点的状态来实现的,即将其全部状态写入持久化存储。如果某个节点发生故障并且其内存中的状态丢失,则最简单的解决方法是将整个图计算回滚到上一个存档点,然后重启计算。如果算法是确定性的,且消息记录在日志中,那么也可以选择性地只恢复丢失的分区(就像之前讨论过的数据流引擎)【72】。
并行执行
顶点不需要知道它在哪台物理机器上执行;当它向其他顶点发送消息时,它只是简单地将消息发往某个顶点ID。图的分区取决于框架 —— 即,确定哪个顶点运行在哪台机器上,以及如何通过网络路由消息,以便它们到达正确的地方。
由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式对图分区。理想情况下如果顶点需要进行大量的通信,那么它们最好能被分区到同一台机器上。然而找到这样一种优化的分区方法是很困难的 —— 在实践中,图经常按照任意分配的顶点ID分区,而不会尝试将相关的顶点分组在一起。
因此,图算法通常会有很多跨机器通信的额外开销,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销会显著拖慢分布式图算法的速度。
出于这个原因,如果你的图可以放入一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理【73,74】。图比内存大也没关系,只要能放入单台计算机的磁盘,使用GraphChi等框架进行单机处理是就一个可行的选择【75】。如果图太大,不适合单机处理,那么像Pregel这样的分布式方法是不可避免的。高效的并行图算法是一个进行中的研究领域【76】。
高级API和语言
自MapReduce开始流行的这几年以来,分布式批处理的执行引擎已经很成熟了。到目前为止,基础设施已经足够强大,能够存储和处理超过10,000台机器群集上的数PB的数据。由于在这种规模下物理执行批处理的问题已经被认为或多或少解决了,所以关注点已经转向其他领域:改进编程模型,提高处理效率,扩大这些技术可以解决的问题集。
如前所述,Hive,Pig,Cascading和Crunch等高级语言和API变得越来越流行,因为手写MapReduce作业实在是个苦力活。随着Tez的出现,这些高级语言还有一个额外好处,可以迁移到新的数据流执行引擎,而无需重写作业代码。 Spark和Flink也有它们自己的高级数据流API,通常是从FlumeJava中获取的灵感【34】。
这些数据流API通常使用关系型构建块来表达一个计算:按某个字段连接数据集;按键对元组做分组;按某些条件过滤;并通过计数求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
除了少写代码的明显优势之外,这些高级接口还支持交互式用法,在这种交互式使用中,你可以在Shell中增量式编写分析代码,频繁运行来观察它做了什么。这种开发风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学,我们在“Unix哲学”中讨论过这个问题。
此外,这些高级接口不仅提高了人类的工作效率,也提高了机器层面的作业执行效率。
向声明式查询语言的转变
与硬写执行连接的代码相比,指定连接关系算子的优点是,框架可以分析连接输入的属性,并自动决定哪种上述连接算法最适合当前任务。 Hive,Spark和Flink都有基于代价的查询优化器可以做到这一点,甚至可以改变连接顺序,最小化中间状态的数量【66,77,78,79】。
连接算法的选择可以对批处理作业的性能产生巨大影响,而无需理解和记住本章中讨论的各种连接算法。如果连接是以声明式(declarative)的方式指定的,那这就这是可行的:应用只是简单地说明哪些连接是必需的,查询优化器决定如何最好地执行连接。我们以前在“数据查询语言”中见过这个想法。
但MapReduce及其后继者数据流在其他方面,与SQL的完全声明式查询模型有很大区别。 MapReduce是围绕着回调函数的概念建立的:对于每条记录或者一组记录,调用一个用户定义的函数(Mapper或Reducer),并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以基于大量已有库的生态系统创作:解析,自然语言分析,图像分析以及运行数值算法或统计算法等。
自由运行任意代码,长期以来都是传统MapReduce批处理系统与MPP数据库的区别所在(参见“比较Hadoop和分布式数据库”一节)。虽然数据库具有编写用户定义函数的功能,但是它们通常使用起来很麻烦,而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统兼容不佳(例如Java的Maven, Javascript的npm,以及Ruby的gems)。
然而数据流引擎已经发现,支持除连接之外的更多声明式特性还有其他的优势。例如,如果一个回调函数只包含一个简单的过滤条件,或者只是从一条记录中选择了一些字段,那么在为每条记录调用函数时会有相当大的额外CPU开销。如果以声明方式表示这些简单的过滤和映射操作,那么查询优化器可以利用面向列的存储布局(参阅“列存储”),只从磁盘读取所需的列。 Hive,Spark DataFrames和Impala还使用了向量化执行(参阅“内存带宽和向量处理”):在对CPU缓存友好的内部循环中迭代数据,避免函数调用。Spark生成JVM字节码【79】,Impala使用LLVM为这些内部循环生成本机代码【41】。
通过在高级API中引入声明式的部分,并使查询优化器可以在执行期间利用这些来做优化,批处理框架看起来越来越像MPP数据库了(并且能实现可与之媲美的性能)。同时,通过拥有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。
专业化的不同领域
尽管能够运行任意代码的可扩展性是很有用的,但是也有很多常见的例子,不断重复着标准的处理模式。因而这些模式值得拥有自己的可重用通用构建模块实现,传统上,MPP数据库满足了商业智能分析和业务报表的需求,但这只是许多使用批处理的领域之一。
另一个越来越重要的领域是统计和数值算法,它们是机器学习应用所需要的(例如分类器和推荐系统)。可重用的实现正在出现:例如,Mahout在MapReduce,Spark和Flink之上实现了用于机器学习的各种算法,而MADlib在关系型MPP数据库(Apache HAWQ)中实现了类似的功能【54】。
空间算法也是有用的,例如最近邻搜索(k-nearest neghbors, kNN)【80】,它在一些多维空间中搜索与给定项最近的项目 —— 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串【81】。
批处理引擎正被用于分布式执行日益广泛的各领域算法。随着批处理系统获得各种内置功能以及高级声明式算子,且随着MPP数据库变得更加灵活和易于编程,两者开始看起来相似了:最终,它们都只是存储和处理数据的系统。