Spark-IoTDB User Guide

Supported Versions

Supported versions of Spark and Scala are as follows:

Spark VersionScala Version
2.4.0-latest2.11, 2.12

Precautions

  1. The current version of spark-iotdb-connector supports Scala 2.11 and 2.12, but not 2.13.
  2. spark-iotdb-connector supports usage in Spark for both Java, Scala, and PySpark.

Deployment

spark-iotdb-connector has two use cases: IDE development and spark-shell debugging.

IDE Development

For IDE development, simply add the following dependency to the pom.xml file:

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <!-- spark-iotdb-connector_2.11 or spark-iotdb-connector_2.13 -->
  4. <artifactId>spark-iotdb-connector_2.12.10</artifactId>
  5. <version>${iotdb.version}</version>
  6. </dependency>

spark-shell Debugging

To use spark-iotdb-connector in spark-shell, you need to download the with-dependencies version of the jar package
from the official website. After that, copy the jar package to the ${SPARK_HOME}/jars directory.
Simply execute the following command:

  1. cp spark-iotdb-connector_2.12.10-${iotdb.version}.jar $SPARK_HOME/jars/

Usage

Parameters

ParameterDescriptionDefault ValueScopeCan be Empty
urlSpecifies the JDBC URL of IoTDBnullread, writefalse
userThe username of IoTDBrootread, writetrue
passwordThe password of IoTDBrootread, writetrue
sqlSpecifies the SQL statement for queryingnullreadtrue
numPartitionSpecifies the partition number of the DataFrame when in read, and the write concurrency number when in write1read, writetrue
lowerBoundThe start timestamp of the query (inclusive)0readtrue
upperBoundThe end timestamp of the query (inclusive)0readtrue

Reading Data from IoTDB

Here is an example that demonstrates how to read data from IoTDB into a DataFrame:

  1. import org.apache.iotdb.spark.db._
  2. val df = spark.read.format("org.apache.iotdb.spark.db")
  3. .option("user", "root")
  4. .option("password", "root")
  5. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  6. .option("sql", "select ** from root") // query SQL
  7. .option("lowerBound", "0") // lower timestamp bound
  8. .option("upperBound", "100000000") // upper timestamp bound
  9. .option("numPartition", "5") // number of partitions
  10. .load
  11. df.printSchema()
  12. df.show()

Writing Data to IoTDB

Here is an example that demonstrates how to write data to IoTDB:

  1. // Construct narrow table data
  2. val df = spark.createDataFrame(List(
  3. (1L, "root.test.d0", 1, 1L, 1.0F, 1.0D, true, "hello"),
  4. (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
  5. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  6. .withColumnRenamed("_2", "Device")
  7. .withColumnRenamed("_3", "s0")
  8. .withColumnRenamed("_4", "s1")
  9. .withColumnRenamed("_5", "s2")
  10. .withColumnRenamed("_6", "s3")
  11. .withColumnRenamed("_7", "s4")
  12. .withColumnRenamed("_8", "s5")
  13. // Write narrow table data
  14. dfWithColumn
  15. .write
  16. .format("org.apache.iotdb.spark.db")
  17. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  18. .save
  19. // Construct wide table data
  20. val df = spark.createDataFrame(List(
  21. (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  22. (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
  23. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  24. .withColumnRenamed("_2", "root.test.d0.s0")
  25. .withColumnRenamed("_3", "root.test.d0.s1")
  26. .withColumnRenamed("_4", "root.test.d0.s2")
  27. .withColumnRenamed("_5", "root.test.d0.s3")
  28. .withColumnRenamed("_6", "root.test.d0.s4")
  29. .withColumnRenamed("_7", "root.test.d0.s5")
  30. // Write wide table data
  31. dfWithColumn.write.format("org.apache.iotdb.spark.db")
  32. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  33. .option("numPartition", "10")
  34. .save

Wide and Narrow Table Conversion

Here are examples of how to convert between wide and narrow tables:

  • From wide to narrow
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root.** where time < 1100 and time > 1000").load
  3. val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • From narrow to wide
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = Transformer.toWideForm(spark, narrow_df)

Wide and Narrow Tables

Using the TsFile structure as an example: there are three measurements in the TsFile pattern,
namely Status, Temperature, and Hardware. The basic information for each of these three measurements is as
follows:

NameTypeEncoding
StatusBooleanPLAIN
TemperatureFloatRLE
HardwareTextPLAIN

The existing data in the TsFile is as follows:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22“aaa”1True
3True22.24“bbb”2False
5False32.16“ccc”4True

The wide (default) table form is as follows:

Timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

You can also use the narrow table format as shown below:

TimeDevicestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull