MapReduce和分布式文件系统
MapReduce有点像Unix工具,但分布在数千台机器上。像Unix工具一样,它相当简单粗暴,但令人惊异地管用。一个MapReduce作业可以和一个Unix进程相类比:它接受一个或多个输入,并产生一个或多个输出。
和大多数Unix工具一样,运行MapReduce作业通常不会修改输入,除了生成输出外没有任何副作用。输出文件以连续的方式一次性写入(一旦写入文件,不会修改任何现有的文件部分)。
虽然Unix工具使用stdin
和stdout
作为输入和输出,但MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中,该文件系统被称为HDFS(Hadoop分布式文件系统),一个Google文件系统(GFS)的开源实现【19】。
除HDFS外,还有各种其他分布式文件系统,如GlusterFS和Quantcast File System(QFS)【20】。诸如Amazon S3,Azure Blob存储和OpenStack Swift 【21】等对象存储服务在很多方面都是相似的^iv。在本章中,我们将主要使用HDFS作为示例,但是这些原则适用于任何分布式文件系统。
与网络连接存储(NAS)和存储区域网络(SAN)架构的共享磁盘方法相比,HDFS基于无共享原则(参见第二部分前言)。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。而另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。
HDFS包含在每台机器上运行的守护进程,对外暴露网络服务,允许其他节点访问存储在该机器上的文件(假设数据中心中的每台通用计算机都挂载着一些磁盘)。名为NameNode的中央服务器会跟踪哪个文件块存储在哪台机器上。因此,HDFS在概念上创建了一个大型文件系统,可以使用所有运行有守护进程的机器的磁盘。
为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本,如第5章中所述,或者诸如Reed-Solomon码这样的纠删码方案,它允许以比完全复制更低的存储开销以恢复丢失的数据【20,22】。这些技术与RAID相似,可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。
HDFS已经扩展的很不错了:在撰写本书时,最大的HDFS部署运行在上万台机器上,总存储容量达数百PB【23】。如此大的规模已经变得可行,因为使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于专用存储设备上的同等容量【24】。
MapReduce作业执行
MapReduce是一个编程框架,你可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式与此示例非常相似:
- 读取一组输入文件,并将其分解成记录(records)。在Web服务器日志示例中,每条记录都是日志中的一行(即
\n
是记录分隔符)。 - 调用Mapper函数,从每条输入记录中提取一对键值。在前面的例子中,Mapper函数是
awk '{print $7}'
:它提取URL($7
)作为关键字,并将值留空。 - 按键排序所有的键值对。在日志的例子中,这由第一个
sort
命令完成。 - 调用Reducer函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中,Reducer是由
uniq -c
命令实现的,该命令使用相同的键来统计相邻记录的数量。
这四个步骤可以作为一个MapReduce作业执行。步骤2(Map)和4(Reduce)是你编写自定义数据处理代码的地方。步骤1(将文件分解成记录)由输入格式解析器处理。步骤3中的排序步骤隐含在MapReduce中 —— 你不必编写它,因为Mapper的输出始终在送往Reducer之前进行排序。
要创建MapReduce作业,你需要实现两个回调函数,Mapper和Reducer,其行为如下(参阅“MapReduce查询”):
Mapper
Mapper会在每条输入记录上调用一次,其工作是从输入记录中提取键值。对于每个输入,它可以生成任意数量的键值对(包括None)。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。
Reducer MapReduce框架拉取由Mapper生成的键值对,收集属于同一个键的所有值,并使用在这组值列表上迭代调用Reducer。 Reducer可以产生输出记录(例如相同URL的出现次数)。
在Web服务器日志的例子中,我们在第5步中有第二个sort
命令,它按请求数对URL进行排序。在MapReduce中,如果你需要第二个排序阶段,则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来,Mapper的作用是将数据放入一个适合排序的表单中,并且Reducer的作用是处理已排序的数据。
分布式执行MapReduce
MapReduce与Unix命令管道的主要区别在于,MapReduce可以在多台机器上并行执行计算,而无需编写代码来显式处理并行问题。Mapper和Reducer一次只能处理一条记录;它们不需要知道它们的输入来自哪里,或者输出去往什么地方,所以框架可以处理在机器之间移动数据的复杂性。
在分布式计算中可以使用标准的Unix工具作为Mapper和Reducer【25】,但更常见的是,它们被实现为传统编程语言的函数。在Hadoop MapReduce中,Mapper和Reducer都是实现特定接口的Java类。在MongoDB和CouchDB中,Mapper和Reducer都是JavaScript函数(参阅“MapReduce查询”)。
图10-1显示了Hadoop MapReduce作业中的数据流。其并行化基于分区(参见第6章):作业的输入通常是HDFS中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理map任务(图10-1中的m1,m2和m3标记)。
每个输入文件的大小通常是数百兆字节。 MapReduce调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个Mapper,只要该机器有足够的备用RAM和CPU资源来运行Mapper任务【26】。这个原则被称为将计算放在数据附近【27】:它节省了通过网络复制输入文件的开销,减少网络负载并增加局部性。
图10-1 具有三个Mapper和三个Reducer的MapReduce任务
在大多数情况下,应该在Mapper任务中运行的应用代码在将要运行它的机器上还不存在,所以MapReduce框架首先将代码(例如Java程序中的JAR文件)复制到适当的机器。然后启动Map任务并开始读取输入文件,一次将一条记录传入Mapper回调函数。Mapper的输出由键值对组成。
计算的Reduce端也被分区。虽然Map任务的数量由输入文件块的数量决定,但Reducer的任务的数量是由作业作者配置的(它可以不同于Map任务的数量)。为了确保具有相同键的所有键值对最终落在相同的Reducer处,框架使用键的散列值来确定哪个Reduce任务应该接收到特定的键值对(参见“按键散列分区”))。
键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。相反,分类是分阶段进行的。首先每个Map任务都按照Reducer对输出进行分区。每个分区都被写入Mapper程序的本地磁盘,使用的技术与我们在“SSTables与LSM树”中讨论的类似。
只要当Mapper读取完输入文件,并写完排序后的输出文件,MapReduce调度器就会通知Reducer可以从该Mapper开始获取输出文件。Reducer连接到每个Mapper,并下载自己相应分区的有序键值对文件。按Reducer分区,排序,从Mapper向Reducer复制分区数据,这一整个过程被称为混洗(shuffle)【26】(一个容易混淆的术语 —— 不像洗牌,在MapReduce中的混洗没有随机性)。
Reduce任务从Mapper获取文件,并将它们合并在一起,并保留有序特性。因此,如果不同的Mapper生成了键相同的记录,则在Reducer的输入中,这些记录将会相邻。
Reducer调用时会收到一个键,和一个迭代器作为参数,迭代器会顺序地扫过所有具有该键的记录(因为在某些情况可能无法完全放入内存中)。Reducer可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录会写入分布式文件系统上的文件中(通常是在跑Reducer的机器本地磁盘上留一份,并在其他机器上留几份副本)。
MapReduce工作流
单个MapReduce作业可以解决的问题范围很有限。以日志分析为例,单个MapReduce作业可以确定每个URL的页面浏览次数,但无法确定最常见的URL,因为这需要第二轮排序。
因此将MapReduce作业链接成为工作流(workflow)中是极为常见的,例如,一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流没有特殊支持,所以这个链是通过目录名隐式实现的:第一个作业必须将其输出配置为HDFS中的指定目录,第二个作业必须将其输入配置为从同一个目录。从MapReduce框架的角度来看,这是是两个独立的作业。
因此,被链接的MapReduce作业并没有那么像Unix命令管道(它直接将一个进程的输出作为另一个进程的输入,仅用一个很小的内存缓冲区)。它更像是一系列命令,其中每个命令的输出写入临时文件,下一个命令从临时文件中读取。这种设计有利也有弊,我们将在“物化中间状态”中讨论。
只有当作业成功完成后,批处理作业的输出才会被视为有效的(MapReduce会丢弃失败作业的部分输出)。因此,工作流中的一项作业只有在先前的作业 —— 即生产其输入的作业 —— 成功完成后才能开始。为了处理这些作业之间的依赖,有很多针对Hadoop的工作流调度器被开发出来,包括Oozie,Azkaban,Luigi,Airflow和Pinball 【28】。
这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统时,由50到100个MapReduce作业组成的工作流是常见的【29】。而在大型组织中,许多不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流而言非常重要。
Hadoop的各种高级工具(如Pig 【30】,Hive 【31】,Cascading 【32】,Crunch 【33】和FlumeJava 【34】)也能自动布线组装多个MapReduce阶段,生成合适的工作流。
Reduce端连接与分组
我们在第2章中讨论了数据模型和查询语言的联接,但是我们还没有深入探讨连接是如何实现的。现在是我们再次捡起这条线索的时候了。
在许多数据集中,一条记录与另一条记录存在关联是很常见的:关系模型中的外键,文档模型中的文档引用或图模型中的边。当你需要同时访问这一关联的两侧(持有引用的记录与被引用的记录)时,连接就是必须的。(包含引用的记录和被引用的记录),连接就是必需的。正如第2章所讨论的,非规范化可以减少对连接的需求,但通常无法将其完全移除^v。
在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用索引来快速定位感兴趣的记录(参阅第3章)。如果查询涉及到连接,则可能涉及到查找多个索引。然而MapReduce没有索引的概念 —— 至少在通常意义上没有。
当MapReduce作业被赋予一组文件作为输入时,它读取所有这些文件的全部内容;数据库会将这种操作称为全表扫描。如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。但是在分析查询中(参阅“事务处理或分析?”),通常需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。
当我们在批处理的语境中讨论连接时,我们指的是在数据集中解析某种关联的全量存在。 例如我们假设一个作业是同时处理所有用户的数据,而非仅仅是为某个特定用户查找数据(而这能通过索引更高效地完成)。
示例:分析用户活动事件
图10-2给出了一个批处理作业中连接的典型例子。左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件(activity events)或点击流数据(clickstream data)),右侧是用户数据库。 你可以将此示例看作是星型模式的一部分(参阅“星型和雪花型:分析的模式”):事件日志是事实表,用户数据库是其中的一个维度。
图10-2 用户行为日志与用户档案的连接
分析任务可能需要将用户活动与用户简档相关联:例如,如果档案包含用户的年龄或出生日期,系统就可以确定哪些页面更受哪些年龄段的用户欢迎。然而活动事件仅包含用户ID,而没有包含完整的用户档案信息。在每个活动事件中嵌入这些档案信息很可能会非常浪费。因此,活动事件需要与用户档案数据库相连接。
实现这一连接的最简单方法是,逐个遍历活动事件,并为每个遇到的用户ID查询用户数据库(在远程服务器上)。这是可能的,但是它的性能可能会非常差:处理吞吐量将受限于受数据库服务器的往返时间,本地缓存的有效性很大程度上取决于数据的分布,并行运行大量查询可能会轻易压垮数据库【35】。
为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)限于单台机器上进行。为待处理的每条记录发起随机访问的网络请求实在是太慢了。而且,查询远程数据库意味着批处理作业变为非确定的(nondeterministic),因为远程数据库中的数据可能会改变。
因此,更好的方法是获取用户数据库的副本(例如,使用ETL进程从数据库备份中提取数据,参阅“数据仓库”),并将它和用户行为日志放入同一个分布式文件系统中。然后你可以将用户数据库存储在HDFS中的一组文件中,而用户活动记录存储在另一组文件中,并能用MapReduce将所有相关记录集中到同一个地方进行高效处理。
排序合并连接
回想一下,Mapper的目的是从每个输入记录中提取一对键值。在图10-2的情况下,这个键就是用户ID:一组Mapper会扫过活动事件(提取用户ID作为键,活动事件作为值),而另一组Mapper将会扫过用户数据库(提取用户ID作为键,用户的出生日期作为值)。这个过程如图10-3所示。
图10-3 在用户ID上进行的Reduce端连接。如果输入数据集分区为多个文件,则每个分区都会被多个Mapper并行处理
当MapReduce框架通过键对Mapper输出进行分区,然后对键值对进行排序时,效果是具有相同ID的所有活动事件和用户记录在Reducer输入中彼此相邻。 Map-Reduce作业甚至可以也让这些记录排序,使Reducer总能先看到来自用户数据库的记录,紧接着是按时间戳顺序排序的活动事件 —— 这种技术被称为二次排序(secondary sort)【26】。
然后Reducer可以容易地执行实际的连接逻辑:每个用户ID都会被调用一次Reducer函数,且因为二次排序,第一个值应该是来自用户数据库的出生日期记录。 Reducer将出生日期存储在局部变量中,然后使用相同的用户ID遍历活动事件,输出已观看网址和观看者年龄的结果对。随后的Map-Reduce作业可以计算每个URL的查看者年龄分布,并按年龄段进行聚集。
由于Reducer一次处理一个特定用户ID的所有记录,因此一次只需要将一条用户记录保存在内存中,而不需要通过网络发出任何请求。这个算法被称为排序合并连接(sort-merge join),因为Mapper的输出是按键排序的,然后Reducer将来自连接两侧的有序记录列表合并在一起。
把相关数据放在一起
在排序合并连接中,Mapper和排序过程确保了所有对特定用户ID执行连接操作的必须数据都被放在同一个地方:单次调用Reducer的地方。预先排好了所有需要的数据,Reducer可以是相当简单的单线程代码,能够以高吞吐量和与低内存开销扫过这些记录。
这种架构可以看做,Mapper将“消息”发送给Reducer。当一个Mapper发出一个键值对时,这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串(不是像IP地址和端口号那样的实际的网络地址),它表现的就像一个地址:所有具有相同键的键值对将被传递到相同的目标(一次Reduce的调用)。
使用MapReduce编程模型,能将计算的物理网络通信层面(从正确的机器获取数据)从应用逻辑中剥离出来(获取数据后执行处理)。这种分离与数据库的典型用法形成了鲜明对比,从数据库中获取数据的请求经常出现在应用代码内部【36】。由于MapReduce能够处理所有的网络通信,因此它也避免了应用代码去担心部分故障,例如另一个节点的崩溃:MapReduce在不影响应用逻辑的情况下能透明地重试失败的任务。
GROUP BY
除了连接之外,“把相关数据放在一起”的另一种常见模式是,按某个键对记录分组(如SQL中的GROUP BY子句)。所有带有相同键的记录构成一个组,而下一步往往是在每个组内进行某种聚合操作,例如:
- 统计每个组中记录的数量(例如在统计PV的例子中,在SQL中表示为
COUNT(*)
聚合) - 对某个特定字段求和(SQL中的
SUM(fieldname)
) - 按某种分级函数取出排名前k条记录。
使用MapReduce实现这种分组操作的最简单方法是设置Mapper,以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有具有相同分区键的记录导向同一个Reducer。因此在MapReduce之上实现分组和连接看上去非常相似。
分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为会话化(sessionization)【37】)。例如,可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本(A/B测试)的用户更有购买欲,或者计算某个营销活动是否值得。
如果你有多个Web服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话cookie,用户ID或类似的标识符作为分组键,以将特定用户的所有活动事件放在一起来实现会话化,与此同时,不同用户的事件仍然散步在不同的分区中。
处理倾斜
如果存在与单个键关联的大量数据,则“将具有相同键的所有记录放到相同的位置”这种模式就被破坏了。例如在社交网络中,大多数用户可能会与几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为关键对象(linchpin object)【38】或热键(hot key)。
在单个Reducer中收集与某个名流相关的所有活动(例如他们发布内容的回复)可能导致严重的倾斜(也称为热点(hot spot))—— 也就是说,一个Reducer必须比其他Reducer处理更多的记录(参见“负载倾斜与消除热点“)。由于MapReduce作业只有在所有Mapper和Reducer都完成时才完成,所有后续作业必须等待最慢的Reducer才能启动。
如果连接的输入存在热点键,可以使用一些算法进行补偿。例如,Pig中的倾斜连接(skewed join)方法首先运行一个抽样作业来确定哪些键是热键【39】。连接实际执行时,Mapper会将热键的关联记录随机(相对于传统MapReduce基于键散列的确定性方法)发送到几个Reducer之一。对于另外一侧的连接输入,与热键相关的记录需要被复制到所有处理该键的Reducer上【40】。
这种技术将处理热键的工作分散到多个Reducer上,这样可以使其更好地并行化,代价是需要将连接另一侧的输入记录复制到多个Reducer上。 Crunch中的分片连接(sharded join)方法与之类似,但需要显式指定热键而不是使用采样作业。这种技术也非常类似于我们在“负载倾斜与消除热点”中讨论的技术,使用随机化来缓解分区数据库中的热点。
Hive的偏斜连接优化采取了另一种方法。它需要在表格元数据中显式指定热键,并将与这些键相关的记录单独存放,与其它文件分开。当在该表上执行连接时,对于热键,它会使用Map端连接(参阅下一节)。
当按照热键进行分组并聚合时,可以将分组分两个阶段进行。第一个MapReduce阶段将记录发送到随机Reducer,以便每个Reducer只对热键的子集执行分组,为每个键输出一个更紧凑的中间聚合结果。然后第二个MapReduce作业将所有来自第一阶段Reducer的中间聚合结果合并为每个键一个值。
Map端连接
上一节描述的连接算法在Reducer中执行实际的连接逻辑,因此被称为Reduce端连接。Mapper扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给Reducer分区,并按键排序。
Reduce端方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper都可以对其预处理以备连接。然而不利的一面是,排序,复制至Reducer,以及合并Reducer输入,所有这些操作可能开销巨大。当数据通过MapReduce 阶段时,数据可能需要落盘好几次,取决于可用的内存缓冲区【37】。
另一方面,如果你能对输入数据作出某些假设,则通过使用所谓的Map端连接来加快连接速度是可行的。这种方法使用了一个阉掉Reduce与排序的MapReduce作业,每个Mapper只是简单地从分布式文件系统中读取一个输入文件块,然后将输出文件写入文件系统,仅此而已。
广播散列连接
适用于执行Map端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个Mapper的内存中。
例如,假设在图10-2的情况下,用户数据库小到足以放进内存中。在这种情况下,当Mapper启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的散列中。完成此操作后,Map程序可以扫描用户活动事件,并简单地在散列表中查找每个事件的用户ID^vi。
参与连接的较大输入的每个文件块各有一个Mapper(在图10-2的例子中活动事件是较大的输入)。每个Mapper都会将较小输入整个加载到内存中。
这种简单有效的算法被称为广播散列连接(broadcast hash join):广播一词反映了这样一个事实,每个连接较大输入端分区的Mapper都会将较小输入端数据集整个读入内存中(所以较小输入实际上“广播”到较大数据的所有分区上),散列一词反映了它使用一个散列表。 Pig(名为“复制链接(replicated join)”),Hive(“MapJoin”),Cascading和Crunch支持这种连接。它也被诸如Impala的数据仓库查询引擎使用【41】。
除了将连接较小输入加载到内存散列表中,另一种方法是将较小输入存储在本地磁盘上的只读索引中【42】。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存散列表几乎一样快的随机查找性能,但实际上并不需要数据集能放入内存中。
分区散列连接
如果Map端连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。在图10-2的情况中,你可以根据用户ID的最后一位十进制数字来对活动事件和用户数据库进行分区(因此连接两侧各有10个分区)。例如,Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中,然后扫描ID为3的每个用户的所有活动事件。
如果分区正确无误,可以确定的是,所有你可能需要连接的记录都落在同一个编号的分区中。因此每个Mapper只需要从输入两端各读取一个分区就足够了。好处是每个Mapper都可以在内存散列表中少放点数据。
这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的MapReduce作业生成的,那么这可能是一个合理的假设。
分区散列连接在Hive中称为Map端桶连接(bucketed map joins)【37】。
Map端合并连接
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则可适用另一种Map端联接的变体。在这种情况下,输入是否小到能放入内存并不重要,因为这时候Mapper同样可以执行归并操作(通常由Reducer执行)的归并操作:按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。
如果能进行Map端合并连接,这通常意味着前一个MapReduce作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的Reduce阶段进行。但使用独立的仅Map作业有时也是合适的,例如,分好区且排好序的中间数据集可能还会用于其他目的。
MapReduce工作流与Map端连接
当下游作业使用MapReduce连接的输出时,选择Map端连接或Reduce端连接会影响输出的结构。Reduce端连接的输出是按照连接键进行分区和排序的,而Map端连接的输出则按照与较大输入相同的方式进行分区和排序(因为无论是使用分区连接还是广播连接,连接较大输入端的每个文件块都会启动一个Map任务)。
如前所述,Map端连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
在Hadoop生态系统中,这种关于数据集分区的元数据通常在HCatalog和Hive Metastore中维护【37】。
批处理工作流的输出
我们已经说了很多用于实现MapReduce工作流的算法,但却忽略了一个重要的问题:这些处理完成之后的最终结果是什么?我们最开始为什么要跑这些作业?
在数据库查询的场景中,我们将事务处理(OLTP)与分析两种目的区分开来(参阅“事务处理或分析?”)。我们看到,OLTP查询通常根据键查找少量记录,使用索引,并将其呈现给用户(比如在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组与聚合,输出通常有着报告的形式:显示某个指标随时间变化的图表,或按照某种排位取前10项,或一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
批处理放哪里合适?它不属于事务处理,也不是分析。它和分析比较接近,因为批处理通常会扫过输入数据集的绝大部分。然而MapReduce作业工作流与用于分析目的的SQL查询是不同的(参阅“Hadoop与分布式数据库的对比”)。批处理过程的输出通常不是报表,而是一些其他类型的结构。
建立搜索索引
Google最初使用MapReduce是为其搜索引擎建立索引,用了由5到10个MapReduce作业组成的工作流实现【1】。虽然Google后来也不仅仅是为这个目的而使用MapReduce 【43】,但如果从构建搜索索引的角度来看,更能帮助理解MapReduce。 (直至今日,Hadoop MapReduce仍然是为Lucene/Solr构建索引的好方法【44】)
我们在“全文搜索和模糊索引”中简要地了解了Lucene这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字,并找到包含该关键字的所有文档ID列表(文章列表)。这是一种非常简化的看法 —— 实际上,搜索索引需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等 —— 但这个原则是成立的。
如果需要对一组固定文档执行全文搜索,则批处理是一种构建索引的高效方法:Mapper根据需要对文档集合进行分区,每个Reducer构建该分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(参阅“分区和二级索引”)并行处理效果拔群。
由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。
如果索引的文档集合发生更改,一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法的计算成本可能会很高。但它的优点是索引过程很容易理解:文档进,索引出。
另一个选择是,可以增量建立索引。如第3章中讨论的,如果要在索引中添加,删除或更新文档,Lucene会写新的段文件,并在后台异步合并压缩段文件。我们将在第11章中看到更多这种增量处理。
键值存储作为批处理输出
搜索索引只是批处理工作流可能输出的一个例子。批处理的另一个常见用途是构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关的搜索【29】)。
这些批处理作业的输出通常是某种数据库:例如,可以通过给定用户ID查询该用户推荐好友的数据库,或者可以通过产品ID查询相关产品的数据库【45】。
这些数据库需要被处理用户请求的Web应用所查询,而它们通常是独立于Hadoop基础设施的。那么批处理过程的输出如何回到Web应用可以查询的数据库中呢?
最直接的选择可能是,直接在Mapper或Reducer中使用你最爱数据库的客户端库,并从批处理作业直接写入数据库服务器,一次写入一条记录。它能工作(假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库),但这并不是一个好主意,出于以下几个原因:
- 正如前面在连接的上下文中讨论的那样,为每条记录发起一个网络请求,要比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
- MapReduce作业经常并行运行许多任务。如果所有Mapper或Reducer都同时写入相同的输出数据库,并以批处理的预期速率工作,那么该数据库很可能被轻易压垮,其查询性能可能变差。这可能会导致系统其他部分的运行问题【35】。
- 通常情况下,MapReduce为作业输出提供了一个干净利落的“全有或全无”保证:如果作业成功,则结果就是每个任务恰好执行一次所产生的输出,即使某些任务失败且必须一路重试。如果整个作业失败,则不会生成输出。然而从作业内部写入外部系统,会产生外部可见的副作用,这种副作用是不能以这种方式被隐藏的。因此,你不得不去操心部分完成的作业对其他系统可见的结果,并需要理解Hadoop任务尝试与预测执行的复杂性。
更好的解决方案是在批处理作业内创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。不少键值存储都支持在MapReduce作业中构建数据库文件,包括Voldemort 【46】,Terrapin 【47】,ElephantDB 【48】和HBase批量加载【49】。
构建这些数据库文件是MapReduce的一种很好用法的使用方法:使用Mapper提取出键并按该键排序,现在已经是构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要WAL(参阅“使B树可靠”)。
将数据加载到Voldemort时,服务器将继续用旧数据文件服务请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动将查询切换到新文件。如果在这个过程中出现任何问题,它可以轻易回滚至旧文件,因为它们仍然存在而且不可变【46】。
批处理输出的哲学
本章前面讨论过的Unix哲学(“Unix哲学”)鼓励以显式指明数据流的方式进行实验:程序读取输入并写入输出。在这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,略做改动或进行调试,而不会搅乱系统的状态。
MapReduce作业的输出处理遵循同样的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:
- 如果在代码中引入了一个错误,而输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将重新被纠正。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的概念被称为人类容错(human fault tolerance)【50】)
- 由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种最小化不可逆性(minimizing irreversibility)的原则有利于敏捷软件开发【51】。
- 如果Map或Reduce任务失败,MapReduce框架将自动重新调度,并在同样的输入上再次运行它。如果失败是由代码中的错误造成的,那么它会不断崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的,那么故障就会被容忍。因为输入不可变,这种自动重试是安全的,而失败任务的输出会被MapReduce框架丢弃。
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。
- 与Unix工具类似,MapReduce作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以实现一个专注做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
在这些领域,在Unix上表现良好的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也有所不同。例如,因为大多数Unix工具都假设输入输出是无类型文本文件,所以它们必须做大量的输入解析工作(本章开头的日志分析示例使用{print $7}
来提取URL)。在Hadoop上可以通过使用更结构化的文件格式消除一些低价值的语法转换:比如Avro(参阅“Avro”)和Parquet(参阅“列存储”)经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进(见第4章)。
Hadoop与分布式数据库的对比
正如我们所看到的,Hadoop有点像Unix的分布式版本,其中HDFS是文件系统,而MapReduce是Unix进程的怪异实现(总是在Map阶段和Reduce阶段运行sort
工具)。我们了解了如何在这些原语的基础上实现各种连接和分组操作。
当MapReduce论文发表时【1】,它从某种意义上来说 —— 并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前所谓的大规模并行处理(MPP, massively parallel processing)数据库中实现了【3,40】。比如Gamma database machine,Teradata和Tandem NonStop SQL就是这方面的先驱【52】。
最大的区别是,MPP数据库专注于在一组机器上并行执行分析SQL查询,而MapReduce和分布式文件系统【19】的组合则更像是一个可以运行任意程序的通用操作系统。
存储多样性
数据库要求你根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。
说白了,Hadoop开放了将数据不加区分地转储到HDFS的可能性,允许后续再研究如何进一步处理【53】。相比之下,在将数据导入数据库专有存储格式之前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。
在纯粹主义者看来,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更高质量的数据来处理。然而实践经验表明,简单地使数据快速可用 —— 即使它很古怪,难以使用,使用原始格式 —— 也通常要比事先决定理想数据模型要更有价值【54】。
这个想法与数据仓库类似(参阅“数据仓库”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相分离的数据集进行连接。 MPP数据库所要求的谨慎模式设计拖慢了集中式数据收集速度;以原始形式收集数据,稍后再操心模式的设计,能使数据收集速度加快(有时被称为“数据湖(data lake)”或“企业数据中心(enterprise data hub)”【55】)。
不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题(读时模式方法【56】;参阅“文档模型中的架构灵活性”)。如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为寿司原则(sushi principle):“原始数据更好”【57】。
因此,Hadoop经常被用于实现ETL过程(参阅“数据仓库”):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业来清理数据,将其转换为关系形式,并将其导入MPP数据仓库以进行分析。数据建模仍然在进行,但它在一个单独的步骤中进行,与数据收集相解耦。这种解耦是可行的,因为分布式文件系统支持以任何格式编码的数据。
处理模型多样性
MPP数据库是单体的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计针对的查询类型上取得非常好的性能。而且,SQL查询语言允许以优雅的语法表达查询,而无需编写代码,使业务分析师用来做商业分析的可视化工具(例如Tableau)能够访问。
另一方面,并非所有类型的处理都可以合理地表达为SQL查询。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更一般的数据处理模型。这些类型的处理通常是特别针对特定应用的(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce,那么你可以在它之上建立一个SQL查询执行引擎,事实上这正是Hive项目所做的【31】。但是,你也可以编写许多其他形式的批处理,这些批处理不必非要用SQL查询表示。
随后,人们发现MapReduce对于某些类型的处理而言局限性很大,表现很差,因此在Hadoop之上其他各种处理模型也被开发出来(我们将在“MapReduce之后”中看到其中一些)。有两种处理模型,SQL和MapReduce,还不够,需要更多不同的模型!而且由于Hadoop平台的开放性,实施一整套方法是可行的,而这在单体MPP数据库的范畴内是不可能的【58】。
至关重要的是,这些不同的处理模型都可以在共享的单个机器集群上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。
Hadoop生态系统包括随机访问的OLTP数据库,如HBase(参阅“SSTables和LSM树”)和MPP风格的分析型数据库,如Impala 【41】。 HBase与Impala都不使用MapReduce,但都使用HDFS进行存储。它们是迥异的数据访问与处理方法,但是它们可以共存,并被集成到同一个系统中。
针对频繁故障设计
当比较MapReduce和MPP数据库时,两种不同的设计思路出现了:处理故障和使用内存与磁盘的方式。与在线系统相比,批处理对故障不太敏感,因为就算失败也不会立即影响到用户,而且它们总是能再次运行。
如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,并让用户重新提交查询或自动重新运行它【3】。由于查询通常最多运行几秒钟或几分钟,所以这种错误处理的方法是可以接受的,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据(例如,使用散列连接)以避免从磁盘读取的开销。
另一方面,MapReduce可以容忍单个Map或Reduce任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也会非常急切地将数据写入磁盘,一方面是为了容错,另一部分是因为假设数据集太大而不能适应内存。
MapReduce方式更适用于较大的作业:要处理如此之多的数据并运行很长时间的作业,以至于在此过程中很可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个作业将是非常浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,这仍然是一种合理的权衡。
但是这些假设有多么现实呢?在大多数集群中,机器故障确实会发生,但是它们不是很频繁 —— 可能少到绝大多数作业都不会经历机器故障。为了容错,真的值得带来这么大的额外开销吗?
要了解MapReduce节约使用内存和在任务的层次进行恢复的原因,了解最初设计MapReduce的环境是很有帮助的。 Google有着混用的数据中心,在线生产服务和离线批处理作业在同样机器上运行。每个任务都有一个通过容器强制执行的资源配给(CPU核心,RAM,磁盘空间等)。每个任务也具有优先级,如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的定价:团队必须为他们使用的资源付费,而优先级更高的进程花费更多【59】。
这种架构允许非生产(低优先级)计算资源被过量使用(overcommitted),因为系统知道必要时它可以回收资源。与分离生产和非生产任务的系统相比,过量使用资源可以更好地利用机器并提高效率。但由于MapReduce作业以低优先级运行,它们随时都有被抢占的风险,因为优先级较高的进程可能需要其资源。在高优先级进程拿走所需资源后,批量作业能有效地“捡面包屑”,利用剩下的任何计算资源。
在谷歌,运行一个小时的MapReduce任务有大约有5%的风险被终止,为了给更高优先级的进程挪地方。这一概率比硬件问题,机器重启或其他原因的概率高了一个数量级【59】。按照这种抢占率,如果一个作业有100个任务,每个任务运行10分钟,那么至少有一个任务在完成之前被终止的风险大于50%。
这就是MapReduce被设计为容忍频繁意外任务终止的原因:不是因为硬件很不可靠,而是因为任意终止进程的自由有利于提高计算集群中的资源利用率。
在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占,以平衡不同队列的资源分配【58】,但在编写本文时,YARN,Mesos或Kubernetes不支持通用优先级抢占【60】。在任务不经常被终止的环境中,MapReduce的这一设计决策就没有多少意义了。在下一节中,我们将研究一些与MapReduce设计决策相异的替代方案。