四、行动操作

  1. 行动操作(action) 会对RDD 计算出一个结果,并将结果返回到driver 程序中(或者把结果存储到外部存储系统,如HDFS 中)

    • 行动操作会强制执行依赖的中间RDD 的求值
  2. 每当调用一个新的行动操作时,整个RDD 都会从头开始计算

    • 要避免这种低效的行为,用户可以将中间RDD 持久化
  3. 在调用sc.textFile() 时,数据并没有读取进来,而是在必要的时候读取。

    • 如果未对读取的结果RDD 缓存,则该读取操作可能被多次执行
  4. spark 采取惰性求值的原因:通过惰性求值,可以把一些操作合并起来从而简化计算过程。

4.1 通用行动操作

  1. .reduce(f):通过 f 来聚合当前RDD

    • f 操作两个相同元素类型的RDD 数据,并且返回一个同样类型的新元素。
    • 该行动操作的结果得到一个值(类型与RDD中的元素类型相同)
  2. .fold(zeroValue,op):通过 op 聚合当前RDD

    该操作首先对每个分区中的元素进行聚合(聚合的第一个数由zeroValue 提供)。然后将分区聚合结果与zeroValue 再进行聚合。

    • f 操作两个相同元素类型的RDD 数据,并且返回一个同样类型的新元素。
    • 该行动操作的结果得到一个值(类型与RDD中的元素类型相同)

    zeroValue 参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。

  3. .aggregate(zeroValue,seqOp,combOp):该操作也是聚合当前RDD。聚合的步骤为:

    首先以分区为单位,对当前RDD 执行seqOp 来进行聚合。聚合的结果不一定与当前TDD 元素相同类型。

    然后以zeroValue 为初始值,将分区聚合结果按照combOp 来聚合(聚合的第一个数由zeroValue 提供),得到最终的聚合结果。

    • zeroValuecombOp 聚合函数的初始值。类型与最终结果类型相同
    • seqOp:分区内的聚合函数,返回类型与zeroValue 相同
    • combOp:分区之间的聚合函数。

    zeroValue 参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。

    示例:取均值:

    1. sum_count = nums.aggregate((0,0),
    1. (lambda acc,value:(acc[0]+value,acc[1]+1),
    1. (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
    1. )
    1. return sum_count[0]/float(sum_count[1])
  4. 获取RDD 中的全部或者部分元素:

    • .collect():它将整个RDD的内容以列表的形式返回到driver 程序中。

      • 通常在测试中使用,且当RDD 内容不大时使用,要求所有结果都能存入单台机器的内存中
      • 它返回元素的顺序可能与你预期的不一致
    • .take(n):以列表的形式返回RDD 中的n 个元素到driver 程序中。

      • 它会尽可能的使用尽量少的分区
      • 它返回元素的顺序可能与你预期的不一致
    • .takeOrderd(n,key=None):以列表的形式按照指定的顺序返回RDD 中的n 个元素到driver 程序中。

      • 默认情况下,使用数据的降序。你也可以提供key 参数来指定比较函数
    • .takeSample(withReplacement,num,seed=None):以列表的形式返回对RDD 随机采样的结果。

      • withReplacement:如果为True,则可以重复采样;否则是无放回采样
      • num:预期采样结果的数量。如果是重复采样,则最终采样结果就是num。如果是无放回采样,则最终采样结果不能超过RDD 的大小。
      • seed:随机数生成器的种子
    • top(n,key=None):获取RDD 的前n个元素。

      • 默认情况下,它使用数据降序的 top n。你也可以提供key 参数来指定比较函数。
    • .first():获取RDD 中的第一个元素。
  5. 计数:

    • .count():返回RDD 的元素总数量(不考虑去重)

    • .countByValue():以字典的形式返回RDD 中,各元素出现的次数。

    • .histogram(buckets):计算分桶

      • 参数:

        • buckets:指定如何分桶。

          • 如果是一个序列,则它指定了桶的区间。如[1,10,20,50] 代表了区间[1,10) [10,20) [20,50](最后一个桶是闭区间)。该序列必须是排序好的,且不能包含重复数字,且至少包含两个数字。
          • 如果是一个数字,则指定了桶的数量。它会自动将数据划分到min~max 之间的、均匀分布的桶中。它必须大于等于1.
      • 返回值:一个元组 (桶区间序列,桶内元素个数序列)

      • 示例:

        1. rdd = sc.parallelize(range(51))
        2. rdd.histogram(2)
        3. # 结果为 ([0, 25, 50], [25, 26])
        4. rdd.histogram([0, 5, 25, 50])
        5. #结果为 ([0, 5, 25, 50], [5, 20, 26])
  6. .foreach(f):对当前RDD 的每个元素执行函数 f

    • 它与.map(f)不同。.map 是转换操作,而.foreach 是行动操作。
    • 通常.foreach 用于将RDD的数据以json 格式发送到网络服务器上,或者写入到数据库中。
  7. .foreachPartition(f):对当前RDD 的每个分区应用f

    示例:

    1. def f1(x):
    1. print(x)
    1. def f2(iterator):
    1. for x in iterator:
    1. print(x)
    1. rdd = sc.parallelize([1, 2, 3, 4, 5])
    1. rdd.foreach(f1)
    1. rdd.foreachPartition(f2)
  8. 统计方法:

    • .max(key=None):返回RDD 的最大值。

      • 参数:

        • key:对RDD 中的值进行映射,比较的是key(x) 之后的结果(但是返回的是x 而不是映射之后的结果)
    • .mean():返回RDD 的均值

    • .min(key=None):返回RDD 的最小值。

      • 参数:

        • key:对RDD 中的值进行映射,比较的是key(x) 之后的结果(但是返回的是x 而不是映射之后的结果)
    • .sampleStdev():计算样本标准差

    • .sampleVariance():计算样本方差

    • .stdev():计算标准差。它与样本标准差的区别在于:分母不同

    • .variance():计算方差。它与样本方差的区别在于:分母不同

    • .sum():计算总和

4.2 Pair RDD 行动操作

  1. Pair RDD 可以使用所有标准RDD 上的可用的行动操作

    • 由于Pair RDD 的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
  2. .countByKey():以字典的形式返回每个键的元素数量。

  3. .collectAsMap():以字典的形式返回所有的键值对。

  4. .lookup(key):以列表的形式返回指定键的所有的值。