四、行动操作
行动操作(
action
) 会对RDD
计算出一个结果,并将结果返回到driver
程序中(或者把结果存储到外部存储系统,如HDFS
中)- 行动操作会强制执行依赖的中间
RDD
的求值
- 行动操作会强制执行依赖的中间
每当调用一个新的行动操作时,整个
RDD
都会从头开始计算- 要避免这种低效的行为,用户可以将中间
RDD
持久化
- 要避免这种低效的行为,用户可以将中间
在调用
sc.textFile()
时,数据并没有读取进来,而是在必要的时候读取。- 如果未对读取的结果
RDD
缓存,则该读取操作可能被多次执行
- 如果未对读取的结果
spark
采取惰性求值的原因:通过惰性求值,可以把一些操作合并起来从而简化计算过程。
4.1 通用行动操作
.reduce(f)
:通过f
来聚合当前RDD
。f
操作两个相同元素类型的RDD
数据,并且返回一个同样类型的新元素。- 该行动操作的结果得到一个值(类型与
RDD
中的元素类型相同)
.fold(zeroValue,op)
:通过op
聚合当前RDD
该操作首先对每个分区中的元素进行聚合(聚合的第一个数由
zeroValue
提供)。然后将分区聚合结果与zeroValue
再进行聚合。f
操作两个相同元素类型的RDD
数据,并且返回一个同样类型的新元素。- 该行动操作的结果得到一个值(类型与
RDD
中的元素类型相同)
zeroValue
参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。.aggregate(zeroValue,seqOp,combOp)
:该操作也是聚合当前RDD
。聚合的步骤为:首先以分区为单位,对当前
RDD
执行seqOp
来进行聚合。聚合的结果不一定与当前TDD
元素相同类型。然后以
zeroValue
为初始值,将分区聚合结果按照combOp
来聚合(聚合的第一个数由zeroValue
提供),得到最终的聚合结果。zeroValue
:combOp
聚合函数的初始值。类型与最终结果类型相同seqOp
:分区内的聚合函数,返回类型与zeroValue
相同combOp
:分区之间的聚合函数。
zeroValue
参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。示例:取均值:
sum_count = nums.aggregate((0,0),
(lambda acc,value:(acc[0]+value,acc[1]+1),
(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
)
return sum_count[0]/float(sum_count[1])
获取
RDD
中的全部或者部分元素:.collect()
:它将整个RDD
的内容以列表的形式返回到driver
程序中。- 通常在测试中使用,且当
RDD
内容不大时使用,要求所有结果都能存入单台机器的内存中 - 它返回元素的顺序可能与你预期的不一致
- 通常在测试中使用,且当
.take(n)
:以列表的形式返回RDD
中的n
个元素到driver
程序中。- 它会尽可能的使用尽量少的分区
- 它返回元素的顺序可能与你预期的不一致
.takeOrderd(n,key=None)
:以列表的形式按照指定的顺序返回RDD
中的n
个元素到driver
程序中。- 默认情况下,使用数据的降序。你也可以提供
key
参数来指定比较函数
- 默认情况下,使用数据的降序。你也可以提供
.takeSample(withReplacement,num,seed=None)
:以列表的形式返回对RDD
随机采样的结果。withReplacement
:如果为True
,则可以重复采样;否则是无放回采样num
:预期采样结果的数量。如果是重复采样,则最终采样结果就是num
。如果是无放回采样,则最终采样结果不能超过RDD
的大小。seed
:随机数生成器的种子
top(n,key=None)
:获取RDD
的前n
个元素。- 默认情况下,它使用数据降序的
top n
。你也可以提供key
参数来指定比较函数。
- 默认情况下,它使用数据降序的
.first()
:获取RDD
中的第一个元素。
计数:
.count()
:返回RDD
的元素总数量(不考虑去重).countByValue()
:以字典的形式返回RDD
中,各元素出现的次数。.histogram(buckets)
:计算分桶参数:
buckets
:指定如何分桶。- 如果是一个序列,则它指定了桶的区间。如
[1,10,20,50]
代表了区间[1,10) [10,20) [20,50]
(最后一个桶是闭区间)。该序列必须是排序好的,且不能包含重复数字,且至少包含两个数字。 - 如果是一个数字,则指定了桶的数量。它会自动将数据划分到
min~max
之间的、均匀分布的桶中。它必须大于等于1.
- 如果是一个序列,则它指定了桶的区间。如
返回值:一个元组
(桶区间序列,桶内元素个数序列)
示例:
rdd = sc.parallelize(range(51))
rdd.histogram(2)
# 结果为 ([0, 25, 50], [25, 26])
rdd.histogram([0, 5, 25, 50])
#结果为 ([0, 5, 25, 50], [5, 20, 26])
.foreach(f)
:对当前RDD
的每个元素执行函数f
。- 它与
.map(f)
不同。.map
是转换操作,而.foreach
是行动操作。 - 通常
.foreach
用于将RDD
的数据以json
格式发送到网络服务器上,或者写入到数据库中。
- 它与
.foreachPartition(f)
:对当前RDD
的每个分区应用f
示例:
def f1(x):
print(x)
def f2(iterator):
for x in iterator:
print(x)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(f1)
rdd.foreachPartition(f2)
统计方法:
.max(key=None)
:返回RDD
的最大值。参数:
key
:对RDD
中的值进行映射,比较的是key(x)
之后的结果(但是返回的是x
而不是映射之后的结果)
.mean()
:返回RDD
的均值.min(key=None)
:返回RDD
的最小值。参数:
key
:对RDD
中的值进行映射,比较的是key(x)
之后的结果(但是返回的是x
而不是映射之后的结果)
.sampleStdev()
:计算样本标准差.sampleVariance()
:计算样本方差.stdev()
:计算标准差。它与样本标准差的区别在于:分母不同.variance()
:计算方差。它与样本方差的区别在于:分母不同.sum()
:计算总和
4.2 Pair RDD 行动操作
Pair RDD
可以使用所有标准RDD
上的可用的行动操作- 由于
Pair RDD
的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
- 由于
.countByKey()
:以字典的形式返回每个键的元素数量。.collectAsMap()
:以字典的形式返回所有的键值对。.lookup(key)
:以列表的形式返回指定键的所有的值。