3.4 从数据源创建
从数据源创建的接口是
DataFrameReader
:reader = spark_session.read
另外,也可以不使用
API
,直接将文件加载到DataFrame
并进行查询:df = spark_session.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
3.4.1 通用加载
设置数据格式:
.format(source)
。- 返回
self
df = spark_session.read.format('json').load('python/test_support/sql/people.json')
- 返回
设置数据
schema
:.schema(schema)
。- 返回
self
- 某些数据源可以从输入数据中推断
schema
。一旦手动指定了schema
,则不再需要推断。
- 返回
加载:
.load(path=None, format=None, schema=None, **options)
参数:
path
:一个字符串,或者字符串的列表。指出了文件的路径format
:指出了文件类型。默认为parquet
(除非另有配置spark.sql.sources.default
)schema
:输入数据的schema
,一个StructType
类型实例。options
:其他的参数
返回值:一个
DataFrame
实例示例:
spark_session.read.format('json').load(['python/test_support/sql/people.json',
'python/test_support/sql/people1.json'])
3.4.2 专用加载
.csv()
:加载csv
文件,返回一个DataFrame
实例.csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
.jdbc()
:加载数据库中的表.jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
参数:
url
:一个JDBC URL
,格式为:jdbc:subprotocol:subname
table
:表名column
:列名。该列为整数列,用于分区。如果该参数被设置,那么numPartitions、lowerBound、upperBound
将用于分区从而生成where
表达式来拆分该列。lowerBound
:column
的最小值,用于决定分区的步长upperBound
:column
的最大值(不包含),用于决定分区的步长numPartitions
:分区的数量predicates
:一系列的表达式,用于where
中。每一个表达式定义了DataFrame
的一个分区properties
:一个字典,用于定义JDBC
连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
- 返回:一个
DataFrame
实例
.json()
:加载json
文件,返回一个DataFrame
实例.json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
示例:
spark_session.read.json('python/test_support/sql/people.json')
# 或者
rdd = sc.textFile('python/test_support/sql/people.json')
spark_session.read.json(rdd)
.orc()
:加载ORC
文件,返回一个DataFrame
实例.orc(path)
示例:
spark_session.read.orc('python/test_support/sql/orc_partitioned')
.parquet()
:加载Parquet
文件,返回一个DataFrame
实例.parquet(*paths)
示例:
spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
.table()
: 从table
中创建一个DataFrame
.table(tableName)
示例:
df = spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
df.createOrReplaceTempView('tmpTable')
spark_session.read.table('tmpTable')
.text()
:从文本中创建一个DataFrame
.text(paths)
它不同于
.csv()
,这里的DataFrame
只有一列,每行文本都是作为一个字符串。示例:
spark_session.read.text('python/test_support/sql/text-test.txt').collect()
#结果为:[Row(value=u'hello'), Row(value=u'this')]