Get started with InfluxDB tasks
This page documents an earlier version of InfluxDB. InfluxDB v2.7 is the latest stable version. View this page in the v2.7 documentation.
An InfluxDB task is a scheduled Flux script that takes a stream of input data, modifies or analyzes it in some way, then writes the modified data back to InfluxDB or performs other actions.
This article walks through writing a basic InfluxDB task that downsamples data and stores it in a new bucket.
Components of a task
Every InfluxDB task needs the following components. Their form and order can vary, but they are all essential parts of a task.
Skip to the full example task script
Define task options
Task options define the schedule, name, and other information about the task. The following example shows how to set task options in a Flux script:
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
See Task configuration options for detailed information about each option.
The InfluxDB UI provides a form for defining task options.
Retrieve and filter data
A minimal Flux script uses the following functions to retrieve a specified amount of data from a data source and then filter the data based on time or column values:
- from(): queries data from InfluxDB .
- range(): defines the time range to return data from.
- filter(): filters data based on column values.
The following sample Flux retrieves data from an InfluxDB bucket and then filters by the _measurement
and host
columns:
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
To retrieve data from other sources, see Flux input functions.
Use task options in your Flux script
InfluxDB stores options in a task
option record that you can reference in your Flux script. The following sample Flux uses the time range -task.every
:
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
task.every
is dot notation that references the every
property of the task
option record. every
is defined as 1h
, therefore -task.every
equates to -1h
.
Using task options to define values in your Flux script can make reusing your task easier.
Process or transform your data
Tasks run scripts automatically at regular intervals. Scripts process or transform data in some way–for example: downsampling, detecting anomalies, or sending notifications.
Consider a task that runs hourly and downsamples data by calculating the average of set intervals. It uses aggregateWindow() to group points into 5-minute (5m
) windows and calculate the average of each window with mean().
The following sample code shows the Flux script with task options:
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
|> aggregateWindow(every: 5m, fn: mean)
Use offset to account for latent data
Use the offset
task option to account for potentially latent data (like data from edge devices). A task that runs at one hour intervals (every: 1h
) with an offset of five minutes (offset: 5m
) executes 5 minutes after the hour, but queries data from the original one-hour interval.
See Common tasks for examples of tasks commonly used with InfluxDB.
Define a destination
In most cases, you’ll want to send and store data after the task has transformed it. The destination could be a separate InfluxDB measurement or bucket.
The example below uses to() to write the transformed data back to another InfluxDB bucket:
// ...
|> to(bucket: "example-downsampled", org: "my-org")
To write data into InfluxDB, to()
requires the following columns:
_time
_measurement
_field
_value
To write data to other destinations, see Flux output functions.
Full example Flux task script
The following sample Flux combines all the components described in this guide:
// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
// Data source
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
// Data processing
|> aggregateWindow(every: 5m, fn: mean)
// Data destination
|> to(bucket: "example-downsampled")
To learn more about InfluxDB tasks and how they work, watch the following video: