9.8 其它

  1. array(*cols):创新一个新的array 列。

    • 参数:

      • cols:列名字符串列表,或者Column 列表。要求这些列具有同样的数据类型
    • 示例:

      1. df.select(array('age', 'age').alias("arr"))
      2. df.select(array([df.age, df.age]).alias("arr"))
  2. array_contains(col, value):创建一个新列,指示value是否在array 中(由col 给定)

    其中col 必须是array 类型。而value 是一个值,或者一个Column 或者列名。

    • 判断逻辑:

      • 如果arraynull,则返回null
      • 如果value 位于 array 中,则返回True
      • 如果value 不在 array 中,则返回False
    • 示例:

      1. df = spark_session.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
      2. df.select(array_contains(df.data, "a"))
  3. create_map(*cols):创建一个map 列。

    • 参数:

      • cols:列名字符串列表,或者Column 列表。这些列组成了键值对。如(key1,value1,key2,value2,...)
    • 示例:

      1. df.select(create_map('name', 'age').alias("map")).collect()
      2. #[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
  4. broadcast(df):标记df 这个Dataframe 足够小,从而应用于broadcast join

    • 参数:

      • df:一个 Dataframe 对象
  5. coalesce(*cols):返回第一个非null 的列组成的Column。如果都为null,则返回null

    • 参数:

      • cols:列名字符串列表,或者Column 列表。
  6. crc32(col):计算二进制列的CRC32 校验值。要求col 是二进制列。

  7. explode(col):将一个array 或者 map 列拆成多行。要求col 是一个array 或者map 列。

    示例:

    1. eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    2. eDF.select(explode(eDF.intlist).alias("anInt")).collect()
    3. # 结果为:[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
    4. eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
    5. #结果为:
    6. # +---+-----+
    7. # |key|value|
    8. # +---+-----+
    9. # | a| b|
    10. # +---+-----+
  8. posexplode(col): 对指定array 或者map 中的每个元素,依据每个位置返回新的一行。

    要求col 是一个array 或者map 列。

    示例:

    1. eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    2. eDF.select(posexplode(eDF.intlist)).collect()
    3. #结果为:[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
  9. expr(str):计算表达式。

    • 参数:

      • str:一个表达式。如length(name)
  10. from_json(col,schema,options={}):解析一个包含JSON 字符串的列。如果遇到无法解析的字符串,则返回null

    • 参数:

      • col:一个字符串列,字符串是json 格式
      • schema:一个StructType(表示解析一个元素),或者StructTypeArrayType(表示解析一组元素)
      • options:用于控制解析过程。
    • 示例:

      1. from pyspark.sql.types import *
      2. schema = StructType([StructField("a", IntegerType())])
      3. df = spark_session.createDataFrame([(1, '{"a": 1}')], ("key", "value"))
      4. df.select(from_json(df.value, schema).alias("json")).collect()
      5. #结果为:[Row(json=Row(a=1))]
  11. get_json_object(col,path):从json 字符串中提取指定的字段。如果json 字符串无效,则返回null.

    • 参数:

      • col:包含json 格式的字符串的列。
      • pathjson 的字段的路径。
    • 示例:

      1. data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
      2. df = spark_session.createDataFrame(data, ("key", "jstring"))
      3. df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"),
      4. get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
      5. # 结果为:[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
  12. greatest(*cols):返回指定的一堆列中的最大值。要求至少包含2列。

    它会跳过null 值。如果都是null 值,则返回null

  13. least(*cols):返回指定的一堆列中的最小值。要求至少包含2列。

    它会跳过null 值。如果都是null 值,则返回null

  14. json_tuple(col,*fields):从json 列中抽取字段组成新列(抽取n 个字段,则生成n 列)

    • 参数:

      • col:一个json 字符串列
      • fields:一组字符串,给出了json 中待抽取的字段
  15. lit(col):创建一个字面量值的列

  16. monotonically_increasing_id():创建一个单调递增的id 列(64位整数)。

    它可以确保结果是单调递增的,并且是unique的,但是不保证是连续的。

    它隐含两个假设:

    • 假设dataframe 分区数量少于1 billion
    • 假设每个分区的记录数量少于8 billion
  17. nanvl(col1,col2):如果col1 不是NaN,则返回col1;否则返回col2

    要求col1col2 都是浮点列(DoubleType 或者 FloatType

  18. size(col):计算array/map 列的长度(元素个数)。

  19. sort_array(col,asc=True): 对array 列中的array 进行排序(排序的方式是自然的顺序)

    • 参数:

      • col:一个字符串或者Column, 指定一个array
      • asc: 如果为True,则是升序;否则是降序
  20. spark_partition_id():返回一个partition ID

    该方法产生的结果依赖于数据划分和任务调度,因此是未确定结果的。

  21. struct(*cols):创建一个新的struct 列。

    • 参数:

      • cols:一个字符串列表(指定了列名),或者一个Column 列表
    • 示例:

      1. df.select(struct('age', 'name').alias("struct")).collect()
      2. # [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
  22. to_json(col,options={}):将包含 StructType 或者ArrytypeStructType 转换为json 字符串。如果遇到不支持的类型,则抛出异常。

    • 参数:

      • col:一个字符串或者Column,表示待转换的列
      • options:转换选项。它支持和json datasource 同样的选项
  23. udf(f=None,returnType=StringType):根据用户定义函数(UDF) 来创建一列。

    • 参数:

      • f:一个python 函数,它接受一个参数
      • returnType:一个pyspqrk.sql.types.DataType 类型,表示udf 的返回类型
    • 示例:

      1. from pyspark.sql.types import IntegerType
      2. slen = udf(lambda s: len(s), IntegerType())
      3. df.select(slen("name").alias("slen_name"))
  24. when(condition,value): 对一系列条件求值,返回其中匹配的哪个结果。

    如果Column.otherwise() 未被调用,则当未匹配时,返回None;如果Column.otherwise() 被调用,则当未匹配时,返回otherwise() 的结果。

    • 参数:

      • condition:一个布尔列
      • value:一个字面量值,或者一个Column
    • 示例:

      1. df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
      2. # [Row(age=3), Row(age=4)]