3.4 从数据源创建

  1. 从数据源创建的接口是DataFrameReader

    1. reader = spark_session.read
  2. 另外,也可以不使用API ,直接将文件加载到DataFrame 并进行查询:

    1. df = spark_session.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

3.4.1 通用加载

  1. 设置数据格式:.format(source)

    • 返回self
    1. df = spark_session.read.format('json').load('python/test_support/sql/people.json')
  2. 设置数据schema.schema(schema)

    • 返回self
    • 某些数据源可以从输入数据中推断schema。一旦手动指定了schema,则不再需要推断。
  3. 加载:.load(path=None, format=None, schema=None, **options)

    • 参数:

      • path:一个字符串,或者字符串的列表。指出了文件的路径
      • format:指出了文件类型。默认为parquet(除非另有配置spark.sql.sources.default
      • schema:输入数据的schema,一个StructType 类型实例。
      • options:其他的参数
    • 返回值:一个DataFrame 实例

    • 示例:

      1. spark_session.read.format('json').load(['python/test_support/sql/people.json',
      2. 'python/test_support/sql/people1.json'])

3.4.2 专用加载

  1. .csv():加载csv 文件,返回一个DataFrame 实例

    1. .csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
  2. .jdbc():加载数据库中的表

    1. .jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
    • 参数:

      • url:一个JDBC URL,格式为:jdbc:subprotocol:subname
      • table:表名
      • column:列名。该列为整数列,用于分区。如果该参数被设置,那么numPartitions、lowerBound、upperBound 将用于分区从而生成where 表达式来拆分该列。
      • lowerBoundcolumn的最小值,用于决定分区的步长
      • upperBoundcolumn的最大值(不包含),用于决定分区的步长
      • numPartitions:分区的数量
      • predicates:一系列的表达式,用于where中。每一个表达式定义了DataFrame 的一个分区
      • properties:一个字典,用于定义JDBC 连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
    • 返回:一个DataFrame 实例
  3. .json():加载json 文件,返回一个DataFrame 实例

    1. .json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)

    示例:

    1. spark_session.read.json('python/test_support/sql/people.json')
    2. # 或者
    3. rdd = sc.textFile('python/test_support/sql/people.json')
    4. spark_session.read.json(rdd)
  4. .orc():加载ORC文件,返回一个DataFrame 实例

    1. .orc(path)

    示例:

    1. spark_session.read.orc('python/test_support/sql/orc_partitioned')
  5. .parquet():加载Parquet文件,返回一个DataFrame 实例

    .parquet(*paths)

    示例:

    1. spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
  6. .table(): 从table 中创建一个DataFrame

    1. .table(tableName)

    示例:

    1. df = spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
    2. df.createOrReplaceTempView('tmpTable')
    3. spark_session.read.table('tmpTable')
  7. .text():从文本中创建一个DataFrame

    1. .text(paths)

    它不同于.csv(),这里的DataFrame 只有一列,每行文本都是作为一个字符串。

    示例:

    1. spark_session.read.text('python/test_support/sql/text-test.txt').collect()
    2. #结果为:[Row(value=u'hello'), Row(value=u'this')]