Spark-IoTDB 用户手册

版本支持

支持的 Spark 与 Scala 版本如下:

Spark 版本Scala 版本
2.4.0-latest2.11, 2.12

注意事项

  1. 当前版本的 spark-iotdb-connector 支持 2.112.12 两个版本的 Scala,暂不支持 2.13 版本。
  2. spark-iotdb-connector 支持在 Java、Scala 版本的 Spark 与 PySpark 中使用。

部署

spark-iotdb-connector 总共有两个使用场景,分别为 IDE 开发与 spark-shell 调试。

IDE 开发

在 IDE 开发时,只需要在 pom.xml 文件中添加以下依赖即可:

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
  4. <artifactId>spark-iotdb-connector_2.12.10</artifactId>
  5. <version>${iotdb.version}</version>
  6. </dependency>

spark-shell 调试

如果需要在 spark-shell 中使用 spark-iotdb-connetcor,需要先在官网下载 with-dependencies 版本的 jar 包。然后再将 Jar 包拷贝到 ${SPARK_HOME}/jars 目录中即可。
执行以下命令即可:

  1. cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/

使用

参数

参数描述默认值使用范围能否为空
url指定 IoTDB 的 JDBC 的 URLnullread、writefalse
userIoTDB 的用户名rootread、writetrue
passwordIoTDB 的密码rootread、writetrue
sql用于指定查询的 SQL 语句nullreadtrue
numPartition在 read 中用于指定 DataFrame 的分区数,在 write 中用于设置写入并发数1read、writetrue
lowerBound查询的起始时间戳(包含)0readtrue
upperBound查询的结束时间戳(包含)0readtrue

从 IoTDB 读取数据

以下是一个示例,演示如何从 IoTDB 中读取数据成为 DataFrame。

  1. import org.apache.iotdb.spark.db._
  2. val df = spark.read.format("org.apache.iotdb.spark.db")
  3. .option("user", "root")
  4. .option("password", "root")
  5. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  6. .option("sql", "select ** from root") // 查询 SQL
  7. .option("lowerBound", "0") // 时间戳下界
  8. .option("upperBound", "100000000") // 时间戳上界
  9. .option("numPartition", "5") // 分区数
  10. .load
  11. df.printSchema()
  12. df.show()

将数据写入 IoTDB

以下是一个示例,演示如何将数据写入 IoTDB。

  1. // 构造窄表数据
  2. val df = spark.createDataFrame(List(
  3. (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  4. (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
  5. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  6. .withColumnRenamed("_2", "Device")
  7. .withColumnRenamed("_3", "s0")
  8. .withColumnRenamed("_4", "s1")
  9. .withColumnRenamed("_5", "s2")
  10. .withColumnRenamed("_6", "s3")
  11. .withColumnRenamed("_7", "s4")
  12. .withColumnRenamed("_8", "s5")
  13. // 写入窄表数据
  14. dfWithColumn
  15. .write
  16. .format("org.apache.iotdb.spark.db")
  17. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  18. .save
  19. // 构造宽表数据
  20. val df = spark.createDataFrame(List(
  21. (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  22. (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
  23. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  24. .withColumnRenamed("_2", "root.test.d0.s0")
  25. .withColumnRenamed("_3", "root.test.d0.s1")
  26. .withColumnRenamed("_4", "root.test.d0.s2")
  27. .withColumnRenamed("_5", "root.test.d0.s3")
  28. .withColumnRenamed("_6", "root.test.d0.s4")
  29. .withColumnRenamed("_7", "root.test.d0.s5")
  30. // 写入宽表数据
  31. dfWithColumn.write.format("org.apache.iotdb.spark.db")
  32. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  33. .option("numPartition", "10")
  34. .save

宽表与窄表转换

以下是如何转换宽表与窄表的示例:

  • 从宽到窄
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
  3. val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • 从窄到宽
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = Transformer.toWideForm(spark, narrow_df)

宽表与窄表

以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile 中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22“aaa”1True
3True22.24“bbb”2False
5False32.16“ccc”4True

宽(默认)表形式如下:

Timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

你还可以使用窄表形式,如下所示:

TimeDevicestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull