influxdb-client-scala

ScalaDoc

The reference Scala client that allows query and write for the InfluxDB 2.x by Pekko Streams. The client is cross-built against Scala 2.12 and 2.13.

Documentation

This section contains links to the client library documentation.

Queries

The QueryScalaApi is based on the Pekko Streams.

The following example demonstrates querying using the Flux language:

  1. package example
  2. import org.apache.pekko.actor.ActorSystem
  3. import org.apache.pekko.stream.scaladsl.Sink
  4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
  5. import com.influxdb.query.FluxRecord
  6. import scala.concurrent.Await
  7. import scala.concurrent.duration.Duration
  8. object InfluxDB2ScalaExample {
  9. implicit val system: ActorSystem = ActorSystem("it-tests")
  10. def main(args: Array[String]): Unit = {
  11. val influxDBClient = InfluxDBClientScalaFactory
  12. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
  13. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  14. + " |> range(start: -1d)"
  15. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
  16. //Result is returned as a stream
  17. val results = influxDBClient.getQueryScalaApi().query(fluxQuery)
  18. //Example of additional result stream processing on client side
  19. val sink = results
  20. //filter on client side using `filter` built-in operator
  21. .filter(it => "cpu0" == it.getValueByKey("cpu"))
  22. //take first 20 records
  23. .take(20)
  24. //print results
  25. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")
  26. ))
  27. // wait to finish
  28. Await.result(sink, Duration.Inf)
  29. influxDBClient.close()
  30. system.terminate()
  31. }
  32. }

It is possible to parse a result line-by-line using the queryRaw method:

  1. package example
  2. import org.apache.pekko.actor.ActorSystem
  3. import org.apache.pekko.stream.scaladsl.Sink
  4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
  5. import scala.concurrent.Await
  6. import scala.concurrent.duration.Duration
  7. object InfluxDB2ScalaExampleRaw {
  8. implicit val system: ActorSystem = ActorSystem("it-tests")
  9. def main(args: Array[String]): Unit = {
  10. val influxDBClient = InfluxDBClientScalaFactory
  11. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
  12. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  13. + " |> range(start: -5m)"
  14. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
  15. + " |> sample(n: 5, pos: 1)")
  16. //Result is returned as a stream
  17. val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
  18. //print results
  19. .runWith(Sink.foreach[String](it => println(s"Line: $it")))
  20. // wait to finish
  21. Await.result(sink, Duration.Inf)
  22. influxDBClient.close()
  23. system.terminate()
  24. }
  25. }

Advanced Usage

Client configuration file

A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

The following options are supported:

Property namedefaultdescription
influx2.url-the url to connect to InfluxDB
influx2.org-default destination organization for writes and queries
influx2.bucket-default destination bucket for writes
influx2.token-the token to use for the authorization
influx2.logLevelNONErest client verbosity level
influx2.readTimeout10000 msread timeout
influx2.writeTimeout10000 mswrite timeout
influx2.connectTimeout10000 mssocket timeout
influx2.precisionNSdefault precision for unix timestamps in the line protocol
influx2.clientType-to customize the User-Agent HTTP header

The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports ms, s and m as unit. Default is milliseconds.

Configuration example
  1. influx2.url=http://localhost:8086
  2. influx2.org=my-org
  3. influx2.bucket=my-bucket
  4. influx2.token=my-token
  5. influx2.logLevel=BODY
  6. influx2.readTimeout=5s
  7. influx2.writeTimeout=10s
  8. influx2.connectTimeout=5s

and then:

  1. val influxDBClient = InfluxDBClientScalaFactory.create();

Client connection string

A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

  1. val influxDBClient = InfluxDBClientScalaFactory
  2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

The following options are supported:

Property namedefaultdescription
org-default destination organization for writes and queries
bucket-default destination bucket for writes
token-the token to use for the authorization
logLevelNONErest client verbosity level
readTimeout10000 msread timeout
writeTimeout10000 mswrite timeout
connectTimeout10000 mssocket timeout
precisionNSdefault precision for unix timestamps in the line protocol
clientType-to customize the User-Agent HTTP header

The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

Gzip support

InfluxDBClientScala does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

  1. influxDBClient.enableGzip();

Log HTTP Request and Response

The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

  1. influxDBClient.setLogLevel(LogLevel.HEADERS)

Check the server status

Server availability can be checked using the influxDBClient.ping() endpoint.

Construct queries using the flux-dsl query builder

  1. package example
  2. import java.time.temporal.ChronoUnit
  3. import org.apache.pekko.actor.ActorSystem
  4. import org.apache.pekko.stream.scaladsl.Sink
  5. import com.influxdb.client.scala.InfluxDBClientScalaFactory
  6. import com.influxdb.query.FluxRecord
  7. import com.influxdb.query.dsl.Flux
  8. import com.influxdb.query.dsl.functions.restriction.Restrictions
  9. import scala.concurrent.Await
  10. import scala.concurrent.duration.Duration
  11. object InfluxDB2ScalaExampleDSL {
  12. implicit val system: ActorSystem = ActorSystem("it-tests")
  13. def main(args: Array[String]) {
  14. val influxDBClient = InfluxDBClientScalaFactory
  15. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
  16. val mem = Flux.from("my-bucket")
  17. .range(-30L, ChronoUnit.MINUTES)
  18. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
  19. //Result is returned as a stream
  20. val results = influxDBClient.getQueryScalaApi().query(mem.toString())
  21. //Example of additional result stream processing on client side
  22. val sink = results
  23. //filter on client side using `filter` built-in operator
  24. .filter(it => it.getValue.asInstanceOf[Double] > 55)
  25. //take first 20 records
  26. .take(20)
  27. //print results
  28. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")))
  29. // wait to finish
  30. Await.result(sink, Duration.Inf)
  31. influxDBClient.close()
  32. system.terminate()
  33. }
  34. }

Version

Scala 2.12

The latest version for Maven dependency:

  1. <dependency>
  2. <groupId>com.influxdb</groupId>
  3. <artifactId>influxdb-client-scala_2.12</artifactId>
  4. <version>7.2.0</version>
  5. </dependency>

Or when using with Gradle:

  1. dependencies {
  2. implementation "com.influxdb:influxdb-client-scala_2.12:7.2.0"
  3. }

Scala 2.13

The latest version for Maven dependency:

  1. <dependency>
  2. <groupId>com.influxdb</groupId>
  3. <artifactId>influxdb-client-scala_2.13</artifactId>
  4. <version>7.2.0</version>
  5. </dependency>

Or when using with Gradle:

  1. dependencies {
  2. implementation "com.influxdb:influxdb-client-scala_2.13:7.2.0"
  3. }

Snapshot Repository

The snapshots are deployed into OSS Snapshot repository.

Maven

  1. <repository>
  2. <id>ossrh</id>
  3. <name>OSS Snapshot repository</name>
  4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
  5. <releases>
  6. <enabled>false</enabled>
  7. </releases>
  8. <snapshots>
  9. <enabled>true</enabled>
  10. </snapshots>
  11. </repository>

Gradle

  1. repositories {
  2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
  3. }