influxdb-client-r

R - 图1

CircleCI codecov

This repository contains R package for InfluxDB 2.0 client.

Features

R - 图4

The InfluxDB 2.0 client supports:

  • Querying data
  • Writing data
  • Getting status

Documentation

R - 图5

This section contains links to the client library documentation.

Installing

R - 图6

The package requires R >= 3.4.

Installing dependencies

R - 图7

  1. install.packages(c("httr", "bit64", "nanotime", "plyr"))

Installing influxdbclient package

R - 图8

The package is published on CRAN and can be installed with

  1. install.packages("influxdbclient")

The latest development version can be installed with

  1. # install.packages("remotes")
  2. remotes::install_github("influxdata/influxdb-client-r")

Usage

R - 图9

Client instantiation

R - 图10

  1. library(influxdbclient)
  2. client <- InfluxDBClient$new(url = "http://localhost:8086",
  3. token = "my-token",
  4. org = "my-org")

Parameters

ParameterDescriptionTypeDefault
urlInfluxDB instance URLcharacternone
tokenauthentication tokencharacternone
orgorganization namecharacternone

Hint: to avoid SSL certificate validation errors when accessing InfluxDB instance over https such as SSL certificate problem: unable to get local issuer certificate, you can try to disable the validation using the following call before using any InfluxDBClient method. Warning: it will disable peer certificate validation for the current R session.

  1. library(httr)
  2. httr::set_config(config(ssl_verifypeer = FALSE))

Querying data

R - 图11

Use query method.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. data <- client$query('from(bucket: "my-bucket") |> range(start: -1h) |> drop(columns: ["_start", "_stop"])')
  5. data

Flux query can yield multiple results in one response, where each result may contain multiple tables.
Return value is therefore a named list, where each element is a list of data frames that represent a result. Data frame represents Flux table. You can list the results using names method.

Quite often, though, there is just a single result and therefore the query by default flattens the return value to simple unnamed list of data frames. This behaviour controlled by flatSingleResult parameter. With flatSingleResult = FALSE, you can check that the return value contains one element with name "_result" (default result name when there is no explicit yield in the query) and use the name to retrieve it, like

  1. > names(data)
  2. [1] "_result"
  3. > data[["_result"]]
  4. [[1]]
  5. time name region sensor_id altitude grounded temperature
  6. 1 2021-06-09T09:52:41+00:00 airSensors south TLM0101 549 FALSE 71.7844100
  7. 2 2021-06-09T09:52:51+00:00 airSensors south TLM0101 547 FALSE 71.7684399
  8. 3 2021-06-09T09:53:01+00:00 airSensors south TLM0101 563 TRUE 71.7819928
  9. 4 2021-06-09T09:53:11+00:00 airSensors south TLM0101 560 TRUE 71.7487767
  10. 5 2021-06-09T09:53:21+00:00 airSensors south TLM0101 544 FALSE 71.7335579

Parameters

ParameterDescriptionTypeDefault
textFlux querycharacternone
POSIXctColFlux time to POSIXct column mappingnamed listc(“_time”=”time”)
flatSingleResultWhether to return simple list when response contains only one resultlogicalTRUE

Incoming type mapping

R - 图12

Flux typeR type
stringcharacter
intinteger64
floatnumeric
boollogical
timenanotime

Using retrieved data as time series

R - 图13

Flux timestamps are parsed into nanotime (integer64 underneath) type, because R datetime types do not support nanosecond precision. nanotime is not a time-based object appropriate for creating a time series, though. By default, query coerces the _time column to time column of POSIXct type (see POSIXctCol parameter), with possible loss precision (which is unimportant in the context of R time series).

Select data of interest from the result like

  1. # from the first data frame, pick subset containing `time` and `_value` columns only
  2. df1 <- data[[1]][c("time", "_value")]

Then, a time series object can be created from the data frame, eg. using tsbox package:

  1. ts1 <- ts_ts(ts_df(df1))

A data frame, or a time series object created from it, can be used for decomposition, anomaly detection etc, like

  1. df1$`_value` %>% ts(freq=168) %>% stl(s.window=13) %>% autoplot()

or

  1. ts1 %>% ts(freq=168) %>% stl(s.window=13) %>% autoplot()

Querying metadata

R - 图14

For queries returning records without time info (listing buckets, tag values etc.), set POSIXctCol to NULL.

  1. buckets <- client$query('buckets()', POSIXctCol = NULL)

Writing data

R - 图15

Use write method.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. data <- ...
  5. response <- client$write(data, bucket = "my-bucket", precision = "us",
  6. measurementCol = "name",
  7. tagCols = c("region", "sensor_id"),
  8. fieldCols = c("altitude", "temperature"),
  9. timeCol = "time")

The example is valid for data.frame data like the following:

  1. > print(data)
  2. time name region sensor_id altitude grounded temperature
  3. 1 2021-06-09T09:52:41+00:00 airSensors south TLM0101 549 FALSE 71.7844100
  4. 2 2021-06-09T09:52:51+00:00 airSensors south TLM0101 547 FALSE 71.7684399
  5. 3 2021-06-09T09:53:01+00:00 airSensors south TLM0101 563 TRUE 71.7819928
  6. 4 2021-06-09T09:53:11+00:00 airSensors south TLM0101 560 TRUE 71.7487767
  7. 5 2021-06-09T09:53:21+00:00 airSensors south TLM0101 544 FALSE 71.7335579
  8. > str(data)
  9. 'data.frame': 5 obs. of 7 variables:
  10. $ time :integer64 1623232361000000000 1623232371000000000 1623232381000000000 1623232391000000000 1623232401000000000
  11. $ name : chr "airSensors" "airSensors" "airSensors" "airSensors" ...
  12. $ region : chr "south" "south" "south" "south" ...
  13. $ sensor_id : chr "TLM0101" "TLM0101" "TLM0101" "TLM0101" ...
  14. $ altitude :integer64 549 547 563 560 544
  15. $ grounded : logi FALSE FALSE TRUE TRUE FALSE
  16. $ temperature: num 71.8 71.8 71.8 71.7 71.7

Parameters

ParameterDescriptionTypeDefault
xdatadata.frame (or list of)none
buckettarget bucket namecharacternone
batchSizebatch sizenumeric5000
precisiontimestamp precisioncharacter (one of s, ms, us, ns)“ns”
measurementColmeasurement column namecharacter“_measurement”
tagColstags column namescharacterNULL
fieldColsfields column namescharacterc(“_field”=”_value”)
timeColtime column namecharacter“_time”
objectoutput objectcharacterNULL

Supported time column value types: nanotime, POSIXct. To write data points without timestamp, set timeCol to NULL. See Timestamp precision for details.

Response is either NULL on success, or errorr otherwise.

Note: default fieldCols value is suitable for writing back unpivoted data retrieved from InfluxDB before. For usual tables (“pivoted” in Flux world), fieldCols should be unnamed list, eg. c("humidity", "temperature", ...).

Outgoing type mapping

R - 图16

R typeInfluxDB type
characterstring
integer, integer64int
numericfloat
logicalbool
nanotime, POSIXcttime

Output preview

R - 图17

To preview how input data are serialized to InfluxDB line protocol, pass the name of object to receive the output as object parameter value.
It changes write to dry-run operation (nothing is sent to the database). The object will be assigned to the calling environment.
This option is intended for debugging purposes.

  1. data <- ...
  2. response <- client$write(data, bucket = "my-bucket", precision = "us",
  3. measurementCol = "name",
  4. tagCols = c("region", "sensor_id"),
  5. fieldCols = c("altitude", "temperature"),
  6. timeCol = "time",
  7. object = "lp")
  8. lp

Sample output:

  1. > print(lp)
  2. [[1]]
  3. [1] "airSensors,region=south,sensor_id=TLM0101 altitude=549i,temperature=71.7844100 1623232361000000"
  4. [2] "airSensors,region=south,sensor_id=TLM0101 altitude=547i,temperature=71.7684399 1623232371000000"
  5. [3] "airSensors,region=south,sensor_id=TLM0101 altitude=563i,temperature=71.7819928 1623232381000000"
  6. [4] "airSensors,region=south,sensor_id=TLM0101 altitude=560i,temperature=71.7487767 1623232391000000"
  7. [5] "airSensors,region=south,sensor_id=TLM0101 altitude=544i,temperature=71.7335579 1623232401000000"

Write retrying

R - 图18

By default, client will not retry failed writes. To instantiate a client with retry support, pass an instance of RetryOptions, eg:

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org",
  4. retryOptions = RetryOptions$new(maxAttempts = 3))

For retry strategy with default options just pass TRUE as retryOptions parameter value:

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org",
  4. retryOptions = TRUE)

Retryable InfluxDB write errors are 429 and 503 status codes. The retry strategy implements exponential backoff algorithm, customizable with RetryOptions.

Getting status

R - 图19

Health status

R - 图20

Use health method to get the health status.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. check <- client$health()

Response is list with health information elements (name, status, version, commit) or error.

Readiness status

R - 图21

Use ready method to get the readiness status.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. check <- client$ready()

Response is a list with status elements (status, started, up) or error.

Advanced

R - 图22

The client automatically follows HTTP redirects.

To use the client with proxy, use set_config to configure the proxy:

  1. library(httr)
  2. httr::set_config(
  3. use_proxy(url = "my-proxy", port = 8080, username = "user",password = "password")
  4. )

Known Issues

R - 图23

Contributing

R - 图24

Contributions are most welcome. The fastest way to get something fixed is to open a PR.

License

R - 图25

The client is available as open source under the terms of the MIT License.