三、转换操作
转换操作(
transformation
) 会从一个RDD
生成一个新的RDD
- 在这个过程中并不会求值。求值发生在
action
操作中 - 在这个过程中并不会改变输入的
RDD
(RDD
是不可变的),而是创建并返回一个新的RDD
- 在这个过程中并不会求值。求值发生在
spark
会使用谱系图来记录各个RDD
之间的依赖关系- 在对
RDD
行动操作中,需要这个依赖关系来按需计算每个中间RDD
- 当持久化的
RDD
丢失部分数据时,也需要这个依赖关系来恢复丢失的数据
- 在对
3.1 通用转换操作
基本转换操作:
.map(f, preservesPartitioning=False)
:将函数f
作用于当前RDD
的每个元素,返回值构成新的RDD
。preservesPartitioning
:如果为True
,则新的RDD
保留旧RDD
的分区
.flatMap(f, preservesPartitioning=False)
:将函数f
作用于当前RDD
的每个元素,将返回的迭代器的内容构成了新的RDD
。flatMap
可以视作:将返回的迭代器扁平化
lines = sc.parallelize(['hello world','hi'])
lines.map(lambda line:line.split(" ")) #新的RDD元素为[['hello','world'],['hi',]]
lines.flatMap(lambda line:line.split(" ")) #新的RDD元素为 ['hello','word','hi']
.mapPartitions(f, preservesPartitioning=False)
:将函数f
作用于当前RDD
的每个分区,将返回的迭代器的内容构成了新的RDD
。这里
f
函数的参数是一个集合(表示一个分区的数据).mapPartitionsWithIndex(f, preservesPartitioning=False)
:将函数f
作用于当前RDD
的每个分区以及分区id
,将返回的迭代器的内容构成了新的RDD
。- 这里
f
函数的参数是f分区id
以及一个集合(表示一个分区的数据)
示例:
def f(splitIndex, iterator):
xxx
rdd.mapPartitionsWithIndex(f)
- 这里
.filter(f)
:将函数f
(称作过滤器) 作用于当前RDD
的每个元素,通过f
的那些元素构成新的RDD
.distinct(numPartitions=None)
:返回一个由当前RDD
元素去重之后的结果组成新的RDD
。numPartitions
:指定了新的RDD
的分区数
sample(withReplacement, fraction, seed=None)
:对当前RDD
进行采样,采样结果组成新的RDD
withReplacement
:如果为True
,则可以重复采样;否则是无放回采样fractions
:新的RDD
的期望大小(占旧RDD
的比例)。spark
并不保证结果刚好满足这个比例(只是一个期望值)- 如果
withReplacement=True
:则表示每个元素期望被选择的次数 - 如果
withReplacement=False
:则表示每个元素期望被选择的概率
- 如果
seed
:随机数生成器的种子
.sortBy(keyfunc, ascending=True, numPartitions=None)
:对当前RDD
进行排序,排序结果组成新的RDD
keyfunc
:自定义的比较函数ascending
:如果为True
,则升序排列
.glom()
:返回一个RDD
,它将旧RDD
每个分区的元素聚合成一个列表,作为新RDD
的元素.groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:返回一个分组的RDD
示例:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
#结果为: [(0, [2, 8]), (1, [1, 1, 3, 5])]
针对两个
RDD
的转换操作:尽管
RDD
不是集合,但是它也支持数学上的集合操作。注意:这些操作都要求被操作的RDD
是相同数据类型的。.union(other)
:合并两个RDD
中所有元素,生成一个新的RDD
。other
:另一个RDD
该操作并不会检查两个输入
RDD
的重复元素,只是简单的将二者合并(并不会去重)。.intersection(other)
:取两个RDD
元素的交集,生成一个新的RDD
。该操作会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。
.subtract(other, numPartitions=None)
:存在于第一个RDD
而不存在于第二个RDD
中的所有元素组成的新的RDD
。该操作也会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。
.cartesian(other)
:两个RDD
的笛卡尔积,生成一个新的RDD
。新
RDD
中的元素是元组(a,b)
,其中a
来自于第一个RDD
,b
来自于第二个RDD
- 注意:求大规模的
RDD
的笛卡尔积开销巨大 - 该操作不会保证结果是去重的,它并不需要网络混洗数据。
- 注意:求大规模的
.keyBy(f)
:创建一个RDD
,它的元素是元组(f(x),x)
。示例:
sc.parallelize(range(2,5)).keyBy(lambda x: x*x)
# 结果为:[(4, 2), (9, 3), (16, 4)]
.pipe(command, env=None, checkCode=False)
:返回一个RDD
,它由外部进程的输出结果组成。参数:
command
:外部进程命令env
:环境变量checkCode
:如果为True
,则校验进程的返回值
.randomSplit(weights, seed=None)
:返回一组新的RDD
,它是旧RDD
的随机拆分参数:
weights
:一个double
的列表。它给出了每个结果DataFrame
的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0seed
:随机数种子
.zip(other)
:返回一个Pair RDD
,其中键来自于self
,值来自于other
- 它假设两个
RDD
拥有同样数量的分区,且每个分区拥有同样数量的元素
- 它假设两个
.zipWithIndex()
:返回一个Pair RDD
,其中键来自于self
,值就是键的索引。.zipWithUniqueId()
:返回一个Pair RDD
,其中键来自于self
,值是一个独一无二的id
。它不会触发一个
spark job
,这是它与zipWithIndex
的重要区别。
3.2 Pair RDD转换操作
Pair RDD
可以使用所有标准RDD
上的可用的转换操作- 由于
Pair RDD
的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
- 由于
基本转换操作:
.keys()
:返回一个新的RDD
,包含了旧RDD
每个元素的键.values()
:返回一个新的RDD
,包含了旧RDD
每个元素的值.mapValues(f)
:返回一个新的RDD
,元素为[K,f(V)]
(保留原来的键不变,通过f
改变值)。.flatMapValues(f)
:返回一个新的RDD
,元素为[K,f(V)]
(保留原来的键不变,通过f
改变值)。它与.mapValues(f)
区别见下面的示例:x=sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
x1=x.flatMapValues(lambda t:t).collect()
# x1: [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
x2=x.mapValues(lambda t:t).collect()
# x2: [("a", ["x", "y", "z"]), ("b", ["p", "r"])]
.sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>)
:对当前Pair RDD
进行排序,排序结果组成新的RDD
keyfunc
:自定义的比较函数ascending
:如果为True
,则升序排列
.sampleByKey(withReplacement, fractions, seed=None)
:基于键的采样(即:分层采样)参数:
withReplacement
:如果为True
,则是有放回的采样;否则是无放回的采样fractions
:一个字典,指定了键上的每个取值的采样比例(不同取值之间的采样比例无关,不需要加起来为1)seed
:随机数种子
.subtractByKey(other, numPartitions=None)
:基于键的差集。返回一个新的RDD
,其中每个(key,value)
都位于self
中,且不在other
中
基于键的聚合操作:
在常规
RDD
上,fold()、aggregate()、reduce()
等都是行动操作。在Pair RDD
上,有类似的一组操作,用于针对相同键的元素进行聚合。这些操作返回RDD
,因此是转化操作而不是行动操作。返回的新
RDD
的键为原来的键,值为针对键的元素聚合的结果。.reduceByKey(f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:合并具有相同键的元素。f
作用于同一个键的那些元素的值。- 它为每个键进行并行的规约操作,每个规约操作将键相同的值合并起来
- 因为数据集中可能有大量的键,因此该操作返回的是一个新的
RDD
:由键,以及对应的规约结果组成
.foldByKey(zeroValue,f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:通过f
聚合具有相同键的元素。其中zeroValue
为零值。参见.fold()
.aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:通过f
聚合具有相同键的元素。其中zeroValue
为零值。参见.aggregate()
.combineByKey(createCombiner,mergeValue,mergeCombiners, numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:它是最为常用的基于键的聚合函数,大多数基于键的聚合函数都是用它实现的。和
aggregate()
一样,combineByKey()
可以让用户返回与输入数据类型不同的返回值。你需要提供三个函数:
createCombiner(v)
:v
表示键对应的值。返回一个C
类型的值(表示累加器)mergeValue(c,v)
:c
表示当前累加器,v
表示键对应的值。返回一个C
类型的值(表示更新后的累加器)mergeCombiners(c1,c2)
:c1
表示某个分区某个键的累加器,c2
表示同一个键另一个分区的累加器。返回一个C
类型的值(表示合并后的累加器)
其工作流程是:遍历分区中的所有元素。考察该元素的键:
如果键从未在该分区中出现过,表明这是分区中的一个新的键。则使用
createCombiner()
函数来创建该键对应的累加器的初始值。注意:这一过程发生在每个分区中,第一次出现各个键的时候发生。而不仅仅是整个
RDD
中第一次出现一个键时发生。如果键已经在该分区中出现过,则使用
mergeValue()
函数将该键的累加器对应的当前值与这个新的值合并由于每个分区是独立处理的,因此同一个键可以有多个累加器。如果有两个或者更多的分区都有同一个键的累加器,则使用
mergeCombiners()
函数将各个分区的结果合并。
数据分组:
.groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:根据键来进行分组。- 返回一个新的
RDD
,类型为[K,Iterable[V]]
,其中K
为原来RDD
的键的类型,V
为原来RDD
的值的类型。 - 如果你分组的目的是为了聚合,那么直接使用
reduceByKey、aggregateByKey
性能更好。
- 返回一个新的
.cogroup(other,numPartitions=None)
:它基于self
和other
两个TDD
中的所有的键来进行分组,它提供了为多个RDD
进行数据分组的方法。- 返回一个新的
RDD
,类型为[K,(Iterable[V],Iterable[W])]
。其中K
为两个输入RDD
的键的类型,V
为原来self
的值的类型,W
为other
的值的类型。 - 如果某个键只存在于一个输入
RDD
中,另一个输入RDD
中不存在,则对应的迭代器为空。 - 它是
groupWith
的别名,但是groupWith
支持更多的TDD
来分组。
- 返回一个新的
数据连接:
数据连接操作的输出
RDD
会包含来自两个输入RDD
的每一组相对应的记录。输出RDD
的类型为[K,(V,W)]
,其中K
为两个输入RDD
的键的类型,V
为原来self
的值的类型,W
为other
的值的类型。.join(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的内连接。.leftOuterJoin(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的左外连接。.rightOuterJoin(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的右外连接。.fullOuterJoin(other, numPartitions=None)
:执行right outer join