5.2 方法
5.2.1 转换操作
聚合操作:
.agg(*exprs)
:在整个DataFrame
开展聚合操作(是df.groupBy.agg()
的快捷方式)示例:
df.agg({"age": "max"}).collect() #在 agg 列上聚合
# 结果为:[Row(max(age)=5)]
# 另一种方式:
from pyspark.sql import functions as F
df.agg(F.max(df.age)).collect()
.filter(condition)
:对行进行过滤。它是
where()
的别名参数:
condition
:一个types.BooleanType
的Column
,或者一个字符串形式的SQL
的表达式
示例:
df.filter(df.age > 3).collect()
df.filter("age > 3").collect()
df.where("age = 2").collect()
分组:
.cube(*cols)
:根据当前DataFrame
的指定列,创建一个多维的cube
,从而方便我们之后的聚合过程。参数:
cols
:指定的列名或者Column
的列表
- 返回值:一个
GroupedData
对象
.groupBy(*cols)
:通过指定的列来将DataFrame
分组,从而方便我们之后的聚合过程。参数:
cols
:指定的列名或者Column
的列表
返回值:一个
GroupedData
对象它是
groupby
的别名
.rollup(*cols)
:创建一个多维的rollup
,从而方便我们之后的聚合过程。参数:
cols
:指定的列名或者Column
的列表
- 返回值:一个
GroupedData
对象
排序:
.orderBy(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列排序参数:
cols
:一个列名或者Column
的列表,指定了排序列ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序- 如果是列表,则必须和
cols
长度相同
- 如果是列表,则必须和
.sort(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列排序参数:
cols
:一个列名或者Column
的列表,指定了排序列ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序- 如果是列表,则必须和
cols
长度相同
- 如果是列表,则必须和
示例:
x
from pyspark.sql.functions import *
df.sort(df.age.desc())
df.sort("age", ascending=False)
df.sort(asc("age"))
df.orderBy(df.age.desc())
df.orderBy("age", ascending=False)
df.orderBy(asc("age"))
.sortWithinPartitions(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列在每个分区进行排序参数:
cols
:一个列名或者Column
的列表,指定了排序列ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序- 如果是列表,则必须和
cols
长度相同
- 如果是列表,则必须和
调整分区:
.coalesce(numPartitions)
:返回一个新的DataFrame
,拥有指定的numPartitions
分区。只能缩小分区数量,而无法扩张分区数量。如果
numPartitions
比当前的分区数量大,则新的DataFrame
的分区数与旧DataFrame
相同它的效果是:不会混洗数据
参数:
numPartitions
:目标分区数量
.repartition(numPartitions, *cols)
:返回一个新的DataFrame
,拥有指定的numPartitions
分区。- 结果
DataFrame
是通过hash
来分区 - 它可以增加分区数量,也可以缩小分区数量
- 结果
集合操作:
.crossJoin(other)
:返回一个新的DataFrame
,它是输入的两个DataFrame
的笛卡儿积可以理解为
[row1,row2]
,其中row1
来自于第一个DataFrame
,row2
来自于第二个DataFrame
参数:
other
:另一个DataFrame
对象
.intersect(other)
:返回两个DataFrame
的行的交集参数:
other
:另一个DataFrame
对象
.join(other,on=None,how=None)
:返回两个DataFrame
的join
参数:
other
:另一个DataFrame
对象on
:指定了在哪些列上执行对齐。可以为字符串或者Column
(指定单个列)、也可以为字符串列表或者Column
列表(指定多个列)注意:要求两个
DataFrame
都存在这些列how
:指定join
的方式,默认为'inner'
。可以为:inner
、cross
、outer
、full
、full_outer
、left
、left_outer
、right
、right_outer
、left_semi
、left_anti
.subtract(other)
:返回一个新的DataFrame
,它的行由位于self
中、但是不在other
中的Row
组成。参数:
other
:另一个DataFrame
对象
.union(other)
: 返回两个DataFrame
的行的并集(它并不会去重)它是
unionAll
的别名参数:
other
:另一个DataFrame
对象
统计:
.crosstab(col1, col2)
:统计两列的成对频率。要求每一列的distinct
值数量少于 个。最多返回 对频率。它是
DataFrameStatFunctions.crosstab()
的别名结果的第一列的列名为,
col1_col2
,值就是第一列的元素值。后面的列的列名就是第二列元素值,值就是对应的频率。参数:
col1,col2
:列名字符串(或者Column
)
示例:
df =pd.DataFrame({'a':[1,3,5],'b':[2,4,6]})
s_df = spark_session.createDataFrame(df)
s_df.crosstab('a','b').collect()
#结果: [Row(a_b='5', 2=0, 4=0, 6=1), Row(a_b='1', 2=1, 4=0, 6=0), Row(a_b='3', 2=0, 4=1, 6=0)]
.describe(*cols)
:计算指定的数值列、字符串列的统计值。统计结果包括:
count、mean、stddev、min、max
该函数仅仅用于探索数据规律
参数:
cols
:列名或者多个列名字符串(或者Column
)。如果未传入任何列名,则计算所有的数值列、字符串列
.freqItems(cols,support=None)
:寻找指定列中频繁出现的值(可能有误报)它是
DataFrameStatFunctions.freqItems()
的别名参数:
cols
:字符串的列表或者元组,指定了待考察的列support
:指定所谓的频繁的标准(默认是 1%)。该数值必须大于
移除数据:
.distinct()
:返回一个新的DataFrame
,它保留了旧DataFrame
中的distinct
行。即:根据行来去重
.drop(*cols)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的指定列。参数:
cols
:列名字符串(或者Column
)。如果它在旧DataFrame
中不存在,也不做任何操作(也不报错)
.dropDuplicates(subset=None)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的重复行。它与
.distinct()
区别在于:它仅仅考虑指定的列来判断是否重复行。参数:
subset
:列名集合(或者Column
的集合)。如果为None
,则考虑所有的列。
.drop_duplicates
是.dropDuplicates
的别名
.dropna(how='any', thresh=None, subset=None)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的null
行。它是
DataFrameNaFunctions.drop()
的别名参数:
how
:指定如何判断null
行的标准。'all'
:所有字段都是na
,则是空行;'any'
:任何字段存在na
,则是空行。thresh
:一个整数。当一行中,非null
的字段数量小于thresh
时,认为是空行。如果该参数设置,则不考虑how
subset
:列名集合,给出了要考察的列。如果为None
,则考察所有列。
.limit(num)
:返回一个新的DataFrame
,它只有旧DataFrame
中的num
行。
采样、拆分:
.randomSplit(weights, seed=None)
:返回一组新的DataFrame
,它是旧DataFrame
的随机拆分参数:
weights
:一个double
的列表。它给出了每个结果DataFrame
的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0seed
:随机数种子
示例:
splits = df.randomSplit([1.0, 2.0], 24)
splits[0].count()
.sample(withReplacement, fraction, seed=None)
:返回一个新的DataFrame
,它是旧DataFrame
的采样参数:
withReplacement
:如果为True
,则可以重复采样;否则是无放回采样fractions
:新的DataFrame
的期望大小(占旧DataFrame
的比例)。spark
并不保证结果刚好满足这个比例(只是一个期望值)- 如果
withReplacement=True
:则表示每个元素期望被选择的次数 - 如果
withReplacement=False
:则表示每个元素期望被选择的概率
- 如果
seed
:随机数生成器的种子
.sampleBy(col, fractions, seed=None)
:返回一个新的DataFrame
,它是旧DataFrame
的采样它执行的是无放回的分层采样。分层由
col
列指定。参数:
col
:列名或者Column
,它给出了分层的依据fractions
:一个字典,给出了每个分层抽样的比例。如果某层未指定,则其比例视作 0
示例:
sampled = df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
# df['key'] 这一列作为分层依据,0 抽取 10%, 1 抽取 20%
替换:
.replace(to_replace, value=None, subset=None)
:返回一组新的DataFrame
,它是旧DataFrame
的数值替代结果它是
DataFrameNaFunctions.replace()
的别名当替换时,
value
将被类型转换到目标列参数:
to_replace
:可以为布尔、整数、浮点数、字符串、列表、字典,给出了被替代的值。- 如果是字典,则给出了每一列要被替代的值
value
:一个整数、浮点数、字符串、列表。给出了替代值。subset
:列名的列表。指定要执行替代的列。
.fillna(value, subset=None)
:返回一个新的DataFrame
,它替换了旧DataFrame
中的null
值。它是
DataFrameNaFunctions.fill()
的别名参数:
value
:一个整数、浮点数、字符串、或者字典,用于替换null
值。如果是个字典,则忽略subset
,字典的键就是列名,指定了该列的null
值被替换的值。subset
:列名集合,给出了要被替换的列
选取数据:
.select(*cols)
:执行一个表达式,将其结果返回为一个DataFrame
参数:
cols
:一个列名的列表,或者Column
表达式。如果列名为*
,则扩张到所有的列名
示例:
df.select('*')
df.select('name', 'age')
df.select(df.name, (df.age + 10).alias('age'))
.selectExpr(*expr)
:执行一个SQL
表达式,将其结果返回为一个DataFrame
参数:
expr
:一组SQL
的字符串描述
示例:
df.selectExpr("age * 2", "abs(age)")
.toDF(*cols)
:选取指定的列组成一个新的DataFrame
参数:
cols
:列名字符串的列表
.toJSON(use_unicode=True)
:返回一个新的DataFrame
,它将旧的DataFrame
转换为RDD
(元素为字符串),其中每一行转换为json
字符串。
列操作:
.withColumn(colName, col)
:返回一个新的DataFrame
,它将旧的DataFrame
增加一列(或者替换现有的列)参数:
colName
:一个列名,表示新增的列(如果是已有的列名,则是替换的列)col
:一个Column
表达式,表示新的列
示例:
df.withColumn('age2', df.age + 2)
.withColumnRenamed(existing, new)
:返回一个新的DataFrame
,它将旧的DataFrame
的列重命名参数:
existing
:一个字符串,表示现有的列的列名col
:一个字符串,表示新的列名
5.2.2 行动操作
查看数据:
.collect()
:以Row
的列表的形式返回所有的数据.first()
:返回第一行(一个Row
对象).head(n=None)
:返回前面的n
行参数:
n
:返回行的数量。默认为1
返回值:
- 如果返回1行,则是一个
Row
对象 - 如果返回多行,则是一个
Row
的列表
- 如果返回1行,则是一个
.show(n=20, truncate=True)
:在终端中打印前n
行。它并不返回结果,而是
print
结果参数:
n
:打印的行数truncate
:如果为True
,则超过20个字符的字符串被截断。如果为一个数字,则长度超过它的字符串将被截断。
.take(num)
:以Row
的列表的形式返回开始的num
行数据。参数:
num
:返回行的数量
.toLocalIterator()
:返回一个迭代器,对它迭代的结果就是DataFrame
的每一行数据(Row
对象)
统计:
.corr(col1, col2, method=None)
:计算两列的相关系数,返回一个浮点数。当前仅支持皮尔逊相关系数DataFrame.corr()
是DataFrameStatFunctions.corr()
的别名参数:
col,col2
:为列的名字字符串(或者Column
)。method
:当前只支持'pearson'
.cov(col1,col2)
:计算两列的协方差。DataFrame.cov()
是DataFrameStatFunctions.cov()
的别名参数:
col,col2
:为列的名字字符串(或者Column
)
.count()
:返回当前DataFrame
有多少行
遍历:
.foreach(f)
:对DataFrame
中的每一行应用f
- 它是
df.rdd.foreach()
的快捷方式
- 它是
.foreachPartition(f)
:对DataFrame
的每个分区应用f
它是
df.rdd.foreachPartition()
的快捷方式示例:
def f(person):
print(person.name)
df.foreach(f)
def f(people):
for person in people:
print(person.name)
df.foreachPartition(f)
.toPandas()
:将DataFrame
作为pandas.DataFrame
返回- 只有当数据较小,可以在驱动器程序中放得下时,才可以用该方法