一、概述
RDD
(弹性分布式数据集Resilient Distributed Dataset
):Spark
中数据的核心抽象。RDD
是不可变的分布式对象集合- 每个
RDD
都被分为多个分区,这些分区运行在集群中的不同节点上 RDD
可以包含Python、Java、Scala
中任意类型的对象
在
spark
中,RDD
相关的函数分为三类:- 创建
RDD
- 转换(
transformation
) 已有的RDD
- 执行动作 (
action
) 来对RDD
求值
在这些背后,
spark
自动将RDD
中的数据分发到集群上,并将action
并行化执行。- 创建
RDD
支持两类操作:- 转换操作(
transformation
) : 它会从一个RDD
生成一个新的RDD
- 行动操作(
action
) :它会对RDD
计算出一个结果,并将结果返回到driver
程序中(或者把结果存储到外部存储系统,如HDFS
中)
- 转换操作(
如果你不知道一个函数时转换操作还是行动操作,你可以考察它的返回值:
- 如果返回的是
RDD
,则是转换操作。如果返回的是其它数据类型,则是行动操作
- 如果返回的是
转换操作和行动操作的区别在于:行动操作会触发实际的计算。
你可以在任意时候定义新的
RDD
,但是Spark
只会惰性计算这些RDD
:只有第一次在一个行动操作中用到时,才会真正计算- 所有返回
RDD
的操作都是惰性的(包括读取数据的sc.textFile()
函数)
- 所有返回
在计算
RDD
时,它所有依赖的中间RDD
也会被求值- 通过完整的转换链,
spark
只计算求值过程中需要的那些数据。
- 通过完整的转换链,
默认情况下,
spark
的RDD
会在你每次对它进行行动操作时重新计算。如果希望在多个行动操作中重用同一个
RDD
,则可以使用RDD.persist()
让spark
把这个RDD
缓存起来在第一次对持久化的
RDD
计算之后,Spark
会把RDD
的内容保存到内存中(以分区的方式存储到集群的各个机器上)。然后在此后的行动操作中,可以重用这些数据lines = sc.textFile("xxx.md")
lines.persist()
lines.count() #计算 lines,此时将 lines 缓存
lines.first() # 使用缓存的 lines
之所以默认不缓存
RDD
的计算结果,是因为:spark
可以直接遍历一遍数据然后计算出结果,没必要浪费存储空间。
每个
Spark
程序或者shell
会话都按照如下流程工作:- 从外部数据创建输入的
RDD
- 使用诸如
filter()
这样的转换操作对RDD
进行转换,以定义新的RDD
- 对需要被重用的中间结果
RDD
执行persist()
操作 - 使用行动操作(如
count()
等) 触发一次并行计算,spark
会对计算进行优化之后再执行
- 从外部数据创建输入的
spark
中的大部分转化操作和一部分行动操作需要用户传入一个可调用对象。在python
中,有三种方式:lambda
表达式、全局定义的函数、局部定义的函数注意:
python
可能会把函数所在的对象也序列化之后向外传递。当传递的函数是某个对象的成员,或者包含了某个对象中一个字段的引用(如
self.xxx
时),spark
会把整个对象发送到工作节点上。- 如果
python
不知道如何序列化该对象,则程序运行失败 - 如果该序列化对象太大,则传输的数据较多
class XXX:
def is_match(self,s):
return xxx
def get_xxx(self,rdd):
return rdd.filter(self.is_match) # bad! 传递的函数 self.is_match 是对象的成员
def get_yyy(self,rdd):
return rdd.filter(lambda x:self._x in x) #bad! 传递的函数包含了对象的成员 self._x
- 如果
解决方案是:将你需要的字段从对象中取出,放到一个局部变量中:
class XXX:
def is_match(self,s):
return xxx
def get_xxx(self,rdd):
_is_match = self.is_match
return rdd.filter(_is_match) # OK
def get_yyy(self,rdd):
_x = self._x
return rdd.filter(lambda x:_x in x) #OK
在
python
中,如果操作对应的RDD
数据类型不正确,则导致运行报错。