5.2 方法

5.2.1 转换操作

  1. 聚合操作:

    • .agg(*exprs):在整个DataFrame 开展聚合操作(是df.groupBy.agg() 的快捷方式)

      示例:

      1. df.agg({"age": "max"}).collect() #在 agg 列上聚合
      2. # 结果为:[Row(max(age)=5)]
      3. # 另一种方式:
      4. from pyspark.sql import functions as F
      5. df.agg(F.max(df.age)).collect()
    • .filter(condition):对行进行过滤。

      • 它是where() 的别名

      • 参数:

        • condition:一个types.BooleanTypeColumn,或者一个字符串形式的SQL 的表达式
      • 示例:

        1. df.filter(df.age > 3).collect()
        2. df.filter("age > 3").collect()
        3. df.where("age = 2").collect()
  2. 分组:

    • .cube(*cols):根据当前DataFrame 的指定列,创建一个多维的cube,从而方便我们之后的聚合过程。

      • 参数:

        • cols:指定的列名或者Column的列表
      • 返回值:一个GroupedData 对象
  • .groupBy(*cols):通过指定的列来将DataFrame 分组,从而方便我们之后的聚合过程。

    • 参数:

      • cols:指定的列名或者Column的列表
    • 返回值:一个GroupedData 对象

    • 它是groupby的别名

  • .rollup(*cols):创建一个多维的rollup,从而方便我们之后的聚合过程。

    • 参数:

      • cols:指定的列名或者Column的列表
    • 返回值:一个GroupedData 对象
  1. 排序:

    • .orderBy(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
    • .sort(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
      • 示例:

        ​x

        1. from pyspark.sql.functions import *
        2. df.sort(df.age.desc())
        3. df.sort("age", ascending=False)
        4. df.sort(asc("age"))
        5. df.orderBy(df.age.desc())
        6. df.orderBy("age", ascending=False)
        7. df.orderBy(asc("age"))
    • .sortWithinPartitions(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列在每个分区进行排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
  2. 调整分区:

    • .coalesce(numPartitions):返回一个新的DataFrame,拥有指定的numPartitions 分区。

      • 只能缩小分区数量,而无法扩张分区数量。如果numPartitions 比当前的分区数量大,则新的DataFrame 的分区数与旧DataFrame 相同

      • 它的效果是:不会混洗数据

      • 参数:

        • numPartitions:目标分区数量
    • .repartition(numPartitions, *cols):返回一个新的DataFrame,拥有指定的numPartitions 分区。

      • 结果DataFrame 是通过hash 来分区
      • 它可以增加分区数量,也可以缩小分区数量
  3. 集合操作:

    • .crossJoin(other):返回一个新的DataFrame,它是输入的两个DataFrame 的笛卡儿积

      可以理解为 [row1,row2],其中 row1 来自于第一个DataFramerow2 来自于第二个DataFrame

      • 参数:

        • other:另一个DataFrame 对象
    • .intersect(other):返回两个DataFrame 的行的交集

      • 参数:

        • other:另一个DataFrame 对象
    • .join(other,on=None,how=None):返回两个DataFramejoin

      • 参数:

        • other:另一个DataFrame 对象

        • on:指定了在哪些列上执行对齐。可以为字符串或者Column(指定单个列)、也可以为字符串列表或者Column 列表(指定多个列)

          注意:要求两个DataFrame 都存在这些列

        • how:指定join 的方式,默认为'inner'。可以为: innercrossouterfullfull_outerleftleft_outerrightright_outerleft_semileft_anti

    • .subtract(other):返回一个新的DataFrame,它的行由位于self 中、但是不在other 中的Row 组成。

      • 参数:

        • other:另一个DataFrame 对象
    • .union(other): 返回两个DataFrame的行的并集(它并不会去重)

      • 它是unionAll 的别名

      • 参数:

        • other:另一个DataFrame 对象
  4. 统计:

    • .crosstab(col1, col2):统计两列的成对频率。要求每一列的distinct 值数量少于 5.2 方法 - 图1 个。最多返回 5.2 方法 - 图2 对频率。

      • 它是DataFrameStatFunctions.crosstab() 的别名

      • 结果的第一列的列名为,col1_col2,值就是第一列的元素值。后面的列的列名就是第二列元素值,值就是对应的频率。

      • 参数:

        • col1,col2:列名字符串(或者Column
      • 示例:

        1. df =pd.DataFrame({'a':[1,3,5],'b':[2,4,6]})
        2. s_df = spark_session.createDataFrame(df)
        3. s_df.crosstab('a','b').collect()
        4. #结果: [Row(a_b='5', 2=0, 4=0, 6=1), Row(a_b='1', 2=1, 4=0, 6=0), Row(a_b='3', 2=0, 4=1, 6=0)]
    • .describe(*cols):计算指定的数值列、字符串列的统计值。

      • 统计结果包括:count、mean、stddev、min、max

      • 该函数仅仅用于探索数据规律

      • 参数:

        • cols:列名或者多个列名字符串(或者Column)。如果未传入任何列名,则计算所有的数值列、字符串列
    • .freqItems(cols,support=None):寻找指定列中频繁出现的值(可能有误报)

      • 它是DataFrameStatFunctions.freqItems() 的别名

      • 参数:

        • cols:字符串的列表或者元组,指定了待考察的列
        • support:指定所谓的频繁的标准(默认是 1%)。该数值必须大于 5.2 方法 - 图3
  5. 移除数据:

    • .distinct():返回一个新的DataFrame,它保留了旧DataFrame 中的distinct 行。

      即:根据行来去重

    • .drop(*cols):返回一个新的DataFrame,它剔除了旧DataFrame 中的指定列。

      • 参数:

        • cols:列名字符串(或者Column)。如果它在旧DataFrame 中不存在,也不做任何操作(也不报错)
    • .dropDuplicates(subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的重复行。

      它与.distinct() 区别在于:它仅仅考虑指定的列来判断是否重复行。

      • 参数:

        • subset:列名集合(或者Column的集合)。如果为None,则考虑所有的列。
      • .drop_duplicates.dropDuplicates 的别名
    • .dropna(how='any', thresh=None, subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的null行。

      • 它是DataFrameNaFunctions.drop() 的别名

      • 参数:

        • how:指定如何判断null 行的标准。'all':所有字段都是na,则是空行;'any':任何字段存在na,则是空行。
        • thresh:一个整数。当一行中,非null 的字段数量小于thresh 时,认为是空行。如果该参数设置,则不考虑how
        • subset:列名集合,给出了要考察的列。如果为None,则考察所有列。
    • .limit(num):返回一个新的DataFrame,它只有旧DataFrame 中的num行。
  6. 采样、拆分:

    • .randomSplit(weights, seed=None):返回一组新的DataFrame,它是旧DataFrame 的随机拆分

      • 参数:

        • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
        • seed:随机数种子
      • 示例:

        1. splits = df.randomSplit([1.0, 2.0], 24)
        2. splits[0].count()
    • .sample(withReplacement, fraction, seed=None):返回一个新的DataFrame,它是旧DataFrame 的采样

      • 参数:

        • withReplacement:如果为True,则可以重复采样;否则是无放回采样

        • fractions:新的DataFrame 的期望大小(占旧DataFrame的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

          • 如果withReplacement=True:则表示每个元素期望被选择的次数
          • 如果withReplacement=False:则表示每个元素期望被选择的概率
        • seed:随机数生成器的种子
    • .sampleBy(col, fractions, seed=None):返回一个新的DataFrame,它是旧DataFrame 的采样

      它执行的是无放回的分层采样。分层由col 列指定。

      • 参数:

        • col:列名或者Column,它给出了分层的依据
        • fractions:一个字典,给出了每个分层抽样的比例。如果某层未指定,则其比例视作 0
      • 示例:

        1. sampled = df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
        2. # df['key'] 这一列作为分层依据,0 抽取 10%, 1 抽取 20%
  7. 替换:

    • .replace(to_replace, value=None, subset=None):返回一组新的DataFrame,它是旧DataFrame 的数值替代结果

      • 它是DataFrameNaFunctions.replace() 的别名

      • 当替换时,value 将被类型转换到目标列

      • 参数:

        • to_replace:可以为布尔、整数、浮点数、字符串、列表、字典,给出了被替代的值。

          • 如果是字典,则给出了每一列要被替代的值
        • value:一个整数、浮点数、字符串、列表。给出了替代值。

        • subset:列名的列表。指定要执行替代的列。

    • .fillna(value, subset=None):返回一个新的DataFrame,它替换了旧DataFrame 中的null值。

      • 它是DataFrameNaFunctions.fill()的别名

      • 参数:

        • value:一个整数、浮点数、字符串、或者字典,用于替换null 值。如果是个字典,则忽略subset,字典的键就是列名,指定了该列的null值被替换的值。
        • subset:列名集合,给出了要被替换的列
  8. 选取数据:

    • .select(*cols):执行一个表达式,将其结果返回为一个DataFrame

      • 参数:

        • cols:一个列名的列表,或者Column 表达式。如果列名为*,则扩张到所有的列名
      • 示例:

        1. df.select('*')
        2. df.select('name', 'age')
        3. df.select(df.name, (df.age + 10).alias('age'))
    • .selectExpr(*expr):执行一个SQL 表达式,将其结果返回为一个DataFrame

      • 参数:

        • expr:一组SQL 的字符串描述
      • 示例:

        1. df.selectExpr("age * 2", "abs(age)")
    • .toDF(*cols):选取指定的列组成一个新的DataFrame

      • 参数:

        • cols:列名字符串的列表
    • .toJSON(use_unicode=True):返回一个新的DataFrame,它将旧的DataFrame 转换为RDD(元素为字符串),其中每一行转换为json 字符串。
  9. 列操作:

    • .withColumn(colName, col):返回一个新的DataFrame,它将旧的DataFrame 增加一列(或者替换现有的列)

      • 参数:

        • colName:一个列名,表示新增的列(如果是已有的列名,则是替换的列)
        • col:一个Column 表达式,表示新的列
      • 示例:

        1. df.withColumn('age2', df.age + 2)
    • .withColumnRenamed(existing, new):返回一个新的DataFrame,它将旧的DataFrame 的列重命名

      • 参数:

        • existing:一个字符串,表示现有的列的列名
        • col:一个字符串,表示新的列名

5.2.2 行动操作

  1. 查看数据:

    • .collect():以Row 的列表的形式返回所有的数据

    • .first():返回第一行(一个Row对象)

    • .head(n=None):返回前面的n

      • 参数:

        • n:返回行的数量。默认为1
      • 返回值:

        • 如果返回1行,则是一个Row 对象
        • 如果返回多行,则是一个Row 的列表
    • .show(n=20, truncate=True):在终端中打印前 n 行。

      • 它并不返回结果,而是print 结果

      • 参数:

        • n:打印的行数
        • truncate:如果为True,则超过20个字符的字符串被截断。如果为一个数字,则长度超过它的字符串将被截断。
    • .take(num):以Row 的列表的形式返回开始的num 行数据。

      • 参数:

        • num:返回行的数量
    • .toLocalIterator():返回一个迭代器,对它迭代的结果就是DataFrame的每一行数据(Row 对象)
  2. 统计:

    • .corr(col1, col2, method=None):计算两列的相关系数,返回一个浮点数。当前仅支持皮尔逊相关系数

      • DataFrame.corr()DataFrameStatFunctions.corr()的别名

      • 参数:

        • col,col2:为列的名字字符串(或者Column)。
        • method:当前只支持'pearson'
    • .cov(col1,col2):计算两列的协方差。

      • DataFrame.cov()DataFrameStatFunctions.cov()的别名

      • 参数:

        • col,col2:为列的名字字符串(或者Column
    • .count():返回当前DataFrame 有多少行
  3. 遍历:

    • .foreach(f):对DataFrame 中的每一行应用f

      • 它是df.rdd.foreach() 的快捷方式
    • .foreachPartition(f):对DataFrame 的每个分区应用f

      • 它是df.rdd.foreachPartition() 的快捷方式

      • 示例:

        1. def f(person):
        2. print(person.name)
        3. df.foreach(f)
        4. def f(people):
        5. for person in people:
        6. print(person.name)
        7. df.foreachPartition(f)
    • .toPandas():将DataFrame 作为pandas.DataFrame 返回

      • 只有当数据较小,可以在驱动器程序中放得下时,才可以用该方法