influxdb-client-kotlin

Kotlin - 图1

KDoc

The reference Kotlin client that allows query and write for the InfluxDB 2.x by Kotlin Channel coroutines.

Documentation

Kotlin - 图3

This section contains links to the client library documentation.

Features

Kotlin - 图4

Queries

Kotlin - 图5

The QueryKotlinApi supports asynchronous queries by Kotlin Channel coroutines.

The following example demonstrates querying using the Flux language:

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import kotlinx.coroutines.channels.consumeEach
  4. import kotlinx.coroutines.channels.filter
  5. import kotlinx.coroutines.channels.take
  6. import kotlinx.coroutines.runBlocking
  7. fun main(args: Array<String>) = runBlocking {
  8. val influxDBClient = InfluxDBClientKotlinFactory
  9. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  10. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  11. + " |> range(start: -1d)"
  12. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
  13. //Result is returned as a stream
  14. val results = influxDBClient.getQueryKotlinApi().query(fluxQuery)
  15. //Example of additional result stream processing on client side
  16. results
  17. //filter on client side using `filter` built-in operator
  18. .filter { "cpu0" == it.getValueByKey("cpu") }
  19. //take first 20 records
  20. .take(20)
  21. //print results
  22. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
  23. influxDBClient.close()
  24. }

It is possible to parse a result line-by-line using the queryRaw method:

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import kotlinx.coroutines.channels.consumeEach
  4. import kotlinx.coroutines.runBlocking
  5. fun main(args: Array<String>) = runBlocking {
  6. val influxDBClient = InfluxDBClientKotlinFactory
  7. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  8. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  9. + " |> range(start: -5m)"
  10. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
  11. + " |> sample(n: 5, pos: 1)")
  12. //Result is returned as a stream
  13. val results = influxDBClient.getQueryKotlinApi().queryRaw(fluxQuery)
  14. //print results
  15. results.consumeEach { println("Line: $it") }
  16. influxDBClient.close()
  17. }

Writes

Kotlin - 图6

The WriteKotlinApi supports ingest data by:

  • DataPoint
  • LineProtocol
  • Data class
  • List of above items

The following example shows how to use various type of data:

  1. package example
  2. import com.influxdb.annotations.Column
  3. import com.influxdb.annotations.Measurement
  4. import com.influxdb.client.domain.WritePrecision
  5. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  6. import com.influxdb.client.write.Point
  7. import kotlinx.coroutines.flow.collect
  8. import kotlinx.coroutines.flow.consumeAsFlow
  9. import kotlinx.coroutines.runBlocking
  10. import java.time.Instant
  11. fun main() = runBlocking {
  12. val org = "my-org"
  13. val bucket = "my-bucket"
  14. //
  15. // Initialize client
  16. //
  17. val client = InfluxDBClientKotlinFactory
  18. .create("http://localhost:8086", "my-token".toCharArray(), org, bucket)
  19. val writeApi = client.getWriteKotlinApi()
  20. //
  21. // Write by Data Point
  22. //
  23. val point = Point.measurement("temperature")
  24. .addTag("location", "west")
  25. .addField("value", 55.0)
  26. .time(Instant.now().toEpochMilli(), WritePrecision.MS)
  27. writeApi.writePoint(point)
  28. //
  29. // Write by LineProtocol
  30. //
  31. writeApi.writeRecord("temperature,location=north value=60.0", WritePrecision.NS)
  32. //
  33. // Write by DataClass
  34. //
  35. val temperature = Temperature("south", 62.0, Instant.now())
  36. writeApi.writeMeasurement(temperature, WritePrecision.NS)
  37. //
  38. // Query results
  39. //
  40. val fluxQuery =
  41. """from(bucket: "$bucket") |> range(start: 0) |> filter(fn: (r) => (r["_measurement"] == "temperature"))"""
  42. client
  43. .getQueryKotlinApi()
  44. .query(fluxQuery)
  45. .consumeAsFlow()
  46. .collect { println("Measurement: ${it.measurement}, value: ${it.value}") }
  47. client.close()
  48. }
  49. @Measurement(name = "temperature")
  50. data class Temperature(
  51. @Column(tag = true) val location: String,
  52. @Column val value: Double,
  53. @Column(timestamp = true) val time: Instant
  54. )

Advanced Usage

Kotlin - 图7

Client configuration file

Kotlin - 图8

A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

The following options are supported:

Property namedefaultdescription
influx2.url-the url to connect to InfluxDB
influx2.org-default destination organization for writes and queries
influx2.bucket-default destination bucket for writes
influx2.token-the token to use for the authorization
influx2.logLevelNONErest client verbosity level
influx2.readTimeout10000 msread timeout
influx2.writeTimeout10000 mswrite timeout
influx2.connectTimeout10000 mssocket timeout
influx2.precisionNSdefault precision for unix timestamps in the line protocol
influx2.clientType-to customize the User-Agent HTTP header

The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports ms, s and m as unit. Default is milliseconds.

Configuration example

Kotlin - 图9

  1. influx2.url=http://localhost:8086
  2. influx2.org=my-org
  3. influx2.bucket=my-bucket
  4. influx2.token=my-token
  5. influx2.logLevel=BODY
  6. influx2.readTimeout=5s
  7. influx2.writeTimeout=10s
  8. influx2.connectTimeout=5s

and then:

  1. val influxDBClient = InfluxDBClientKotlinFactory.create();

Client connection string

Kotlin - 图10

A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

  1. val influxDBClient = InfluxDBClientKotlinFactory
  2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

The following options are supported:

Property namedefaultdescription
org-default destination organization for writes and queries
bucket-default destination bucket for writes
token-the token to use for the authorization
logLevelNONErest client verbosity level
readTimeout10000 msread timeout
writeTimeout10000 mswrite timeout
connectTimeout10000 mssocket timeout
precisionNSdefault precision for unix timestamps in the line protocol
clientType-to customize the User-Agent HTTP header

The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

Gzip support

Kotlin - 图11

InfluxDBClientKotlin does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

  1. influxDBClient.enableGzip();

Log HTTP Request and Response

Kotlin - 图12

The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

  1. influxDBClient.setLogLevel(LogLevel.HEADERS)

Check the server status

Kotlin - 图13

Server availability can be checked using the influxDBClient.ping() endpoint.

Construct queries using the flux-dsl query builder

Kotlin - 图14

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import com.influxdb.query.dsl.Flux
  4. import com.influxdb.query.dsl.functions.restriction.Restrictions
  5. import kotlinx.coroutines.channels.consumeEach
  6. import kotlinx.coroutines.channels.filter
  7. import kotlinx.coroutines.channels.take
  8. import kotlinx.coroutines.runBlocking
  9. import java.time.temporal.ChronoUnit
  10. fun main(args: Array<String>) = runBlocking {
  11. val influxDBClient = InfluxDBClientKotlinFactory
  12. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  13. val mem = Flux.from("my-bucket")
  14. .range(-30L, ChronoUnit.MINUTES)
  15. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
  16. //Result is returned as a stream
  17. val results = influxDBClient.getQueryKotlinApi().query(mem.toString())
  18. //Example of additional result stream processing on client side
  19. results
  20. //filter on client side using `filter` built-in operator
  21. .filter { (it.value as Double) > 55 }
  22. // take first 20 records
  23. .take(20)
  24. //print results
  25. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
  26. influxDBClient.close()
  27. }

Version

Kotlin - 图15

The latest version for Maven dependency:

  1. <dependency>
  2. <groupId>com.influxdb</groupId>
  3. <artifactId>influxdb-client-kotlin</artifactId>
  4. <version>7.2.0</version>
  5. </dependency>

Or when using with Gradle:

  1. dependencies {
  2. implementation "com.influxdb:influxdb-client-kotlin:7.2.0"
  3. }

Snapshot Repository

Kotlin - 图16

The snapshots are deployed into OSS Snapshot repository.

Maven

Kotlin - 图17

  1. <repository>
  2. <id>ossrh</id>
  3. <name>OSS Snapshot repository</name>
  4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
  5. <releases>
  6. <enabled>false</enabled>
  7. </releases>
  8. <snapshots>
  9. <enabled>true</enabled>
  10. </snapshots>
  11. </repository>

Gradle

Kotlin - 图18

  1. repositories {
  2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
  3. }