Get started processing data

  • 4 / 5

Now that you know the basics of querying data from InfluxDB, let’s go beyond a basic query and begin to process the queried data. “Processing” data could mean transforming, aggregating, downsampling, or alerting on data. This tutorial covers the following data processing use cases:

Most data processing operations require manually editing Flux queries. If you’re using the InfluxDB Data Explorer, switch to the Script Editor instead of using the Query Builder.

Remap or assign values in your data

Use the map() function to iterate over each row in your data and update the values in that row. map() is one of the most useful functions in Flux and will help you accomplish many of they data processing operations you need to perform.

Learn more about how map() works

map() takes a single parameter, fn. fn takes an anonymous function that reads each row as a record named r. In the r record, each key-value pair represents a column and its value. For example:

  1. r = {
  2. _time: 2020-01-01T00:00:00Z,
  3. _measurement: "home",
  4. room: "Kitchen",
  5. _field: "temp",
  6. _value: 21.0,
  7. }
_time_measurementroom_field_value
2020-01-01T00:00:00ZhomeKitchentemp21.0

The fn function modifies the r record in any way you need and returns a new record for the row. For example, using the record above:

  1. (r) => ({ _time: r._time, _field: "temp_F", _value: (r._value * 1.8) + 32.0})
  2. // Returns: {_time: 2020-01-01T00:00:00Z, _field: "temp_F", _value: 69.8}
_time_field_value
2020-01-01T00:00:00Ztemp_F69.8

Notice that some of the columns were dropped from the original row record. This is because the fn function explicitly mapped the _time, _field, and _value columns. To retain existing columns and only update or add specific columns, use the with operator to extend your row record. For example, using the record above:

  1. (r) => ({r with _value: (r._value * 1.8) + 32.0, degrees: "F"})
  2. // Returns:
  3. // {
  4. // _time: 2020-01-01T00:00:00Z,
  5. // _measurement: "home",
  6. // room: "Kitchen",
  7. // _field: "temp",
  8. // _value: 69.8,
  9. // degrees: "F",
  10. // }
_time_measurementroom_field_valuedegrees
2020-01-01T00:00:00ZhomeKitchentemp69.8F
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "hum")
  5. |> map(fn: (r) => ({r with _value: r._value / 100.0}))

Map examples

Perform mathematical operations

map() lets your perform mathematical operations on your data. For example, using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field to return room temperatures in °C.
  2. Use map() to iterate over each row and convert the °C temperatures in the _value column to °F using the equation: °F = (°C * 1.8) + 32.0.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> map(fn: (r) => ({r with _value: (r._value * 1.8) + 32.0}))

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp73.03999999999999
2022-01-01T15:00:00ZhomeKitchentemp72.86
2022-01-01T16:00:00ZhomeKitchentemp72.32
2022-01-01T17:00:00ZhomeKitchentemp72.86
2022-01-01T18:00:00ZhomeKitchentemp73.94
2022-01-01T19:00:00ZhomeKitchentemp73.58000000000001
2022-01-01T20:00:00ZhomeKitchentemp72.86
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp72.14
2022-01-01T15:00:00ZhomeLiving Roomtemp72.14
2022-01-01T16:00:00ZhomeLiving Roomtemp72.32
2022-01-01T17:00:00ZhomeLiving Roomtemp72.68
2022-01-01T18:00:00ZhomeLiving Roomtemp73.03999999999999
2022-01-01T19:00:00ZhomeLiving Roomtemp72.5
2022-01-01T20:00:00ZhomeLiving Roomtemp71.96000000000001

Conditionally assign a state

Within a map() function, you can use conditional expressions (if/then/else) to conditionally assign values. For example, using the data written in “Get started writing to InfluxDB”:

  1. Query the co field to return carbon monoxide parts per million (ppm) readings in each room.

  2. Use map() to iterate over each row, evaluate the value in the _value column, and then conditionally assign a state:

    • If the carbon monoxide is less than 10 ppm, assign the state: ok.
    • Otherwise, assign the state: warning.

    Store the state in a state column.

  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "co")
  5. |> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchenco1
2022-01-01T15:00:00ZhomeKitchenco3
2022-01-01T16:00:00ZhomeKitchenco7
2022-01-01T17:00:00ZhomeKitchenco9
2022-01-01T18:00:00ZhomeKitchenco18
2022-01-01T19:00:00ZhomeKitchenco22
2022-01-01T20:00:00ZhomeKitchenco26
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomco1
2022-01-01T15:00:00ZhomeLiving Roomco1
2022-01-01T16:00:00ZhomeLiving Roomco4
2022-01-01T17:00:00ZhomeLiving Roomco5
2022-01-01T18:00:00ZhomeLiving Roomco9
2022-01-01T19:00:00ZhomeLiving Roomco14
2022-01-01T20:00:00ZhomeLiving Roomco17

_start and _stop columns have been omitted.

_time_measurementroom_field_valuestate
2022-01-01T14:00:00ZhomeKitchenco1ok
2022-01-01T15:00:00ZhomeKitchenco3ok
2022-01-01T16:00:00ZhomeKitchenco7ok
2022-01-01T17:00:00ZhomeKitchenco9ok
2022-01-01T18:00:00ZhomeKitchenco18warning
2022-01-01T19:00:00ZhomeKitchenco22warning
2022-01-01T20:00:00ZhomeKitchenco26warning
_time_measurementroom_field_valuestate
2022-01-01T14:00:00ZhomeLiving Roomco1ok
2022-01-01T15:00:00ZhomeLiving Roomco1ok
2022-01-01T16:00:00ZhomeLiving Roomco4ok
2022-01-01T17:00:00ZhomeLiving Roomco5ok
2022-01-01T18:00:00ZhomeLiving Roomco9ok
2022-01-01T19:00:00ZhomeLiving Roomco14warning
2022-01-01T20:00:00ZhomeLiving Roomco17warning

Alert on data

map() lets you execute more complex operations on a per row basis. Using a Flux block ({}) in the fn function, you can create scoped variables and execute other functions within the context of each row. For example, you can send a message to Slack.

For this example to actually send messages to Slack, you need to set up a Slack app that can send and receive messages.

For example, using the data written in “Get started writing to InfluxDB”:

  1. Import the slack package.

  2. Query the co field to return carbon monoxide parts per million (ppm) readings in each room.

  3. Use map() to iterate over each row, evaluate the value in the _value column, and then conditionally assign a state:

    • If the carbon monoxide is less than 10 ppm, assign the state: ok.
    • Otherwise, assign the state: warning.

    Store the state in a state column.

  4. Use filter() to return only rows with warning in the state column.

  5. Use map() to iterate over each row. In your fn function, use a Flux block ({}) to:

    1. Create a responseCode variable that uses slack.message() to send a message to Slack using data from the input row. slack.message() returns the response code of the Slack API request as an integer.
    2. Use a return statement to return a new row record. The new row should extend the input row with a new column, sent, with a boolean value determined by the responseCode variable.

map() sends a message to Slack for each row piped forward into the function.

  1. import "slack"
  2. from(bucket: "get-started")
  3. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  4. |> filter(fn: (r) => r._measurement == "home")
  5. |> filter(fn: (r) => r._field == "co")
  6. |> map(fn: (r) => ({r with state: if r._value < 10 then "ok" else "warning"}))
  7. |> filter(fn: (r) => r.state == "warning")
  8. |> map(
  9. fn: (r) => {
  10. responseCode =
  11. slack.message(
  12. token: "mYSlacK70k3n",
  13. color: "#ff0000",
  14. channel: "#alerts",
  15. text: "Carbon monoxide is at dangerous levels in the ${r.room}: ${r._value} ppm.",
  16. )
  17. return {r with sent: responseCode == 200}
  18. },
  19. )

Input Output Click to view output

The following input represents the data filtered by the warning state.

_start and _stop columns have been omitted.

_time_measurementroom_field_valuestate
2022-01-01T18:00:00ZhomeKitchenco18warning
2022-01-01T19:00:00ZhomeKitchenco22warning
2022-01-01T20:00:00ZhomeKitchenco26warning
_time_measurementroom_field_valuestate
2022-01-01T19:00:00ZhomeLiving Roomco14warning
2022-01-01T20:00:00ZhomeLiving Roomco17warning

The output includes a sent column indicating the if the message was sent.

_start and _stop columns have been omitted.

_time_measurementroom_field_valuestatesent
2022-01-01T18:00:00ZhomeKitchenco18warningtrue
2022-01-01T19:00:00ZhomeKitchenco22warningtrue
2022-01-01T20:00:00ZhomeKitchenco26warningtrue
_time_measurementroom_field_valuestatesent
2022-01-01T19:00:00ZhomeLiving Roomco14warningtrue
2022-01-01T20:00:00ZhomeLiving Roomco17warningtrue

With the results above, you would receive the following messages in Slack:

Carbon monoxide is at dangerous levels in the Kitchen: 18 ppm.
Carbon monoxide is at dangerous levels in the Kitchen: 22 ppm.
Carbon monoxide is at dangerous levels in the Living Room: 14 ppm.
Carbon monoxide is at dangerous levels in the Kitchen: 26 ppm.
Carbon monoxide is at dangerous levels in the Living Room: 17 ppm.

You can also use the InfluxDB checks and notifications system as a user interface for configuring checks and alerting on data.

Group data

Use the group() function to regroup your data by specific column values in preparation for further processing.

  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> group(columns: ["room", "_field"])

Understanding data grouping and why it matters is important, but may be too much for this “getting started” tutorial. For more information about how data is grouped and why it matters, see the Flux data model documentation.

By default, from() returns data queried from InfluxDB grouped by series (measurement, tags, and field key). Each table in the returned stream of tables represents a group. Each table contains the same values for the columns that data is grouped by. This grouping is important as you aggregate data.

Group examples

Group data by specific columns

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp and hum fields.
  2. Use group() to group by only the _field column.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp" or r._field == "hum")
  5. |> group(columns: ["_field"])

Input Output Click to view output

The following data is output from the last filter() and piped forward into group():

_start and _stop columns have been omitted.

Group key instance = [_measurement=home, room=Kitchen, _field=hum]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchenhum35.9
2022-01-01T09:00:00ZhomeKitchenhum36.2
2022-01-01T10:00:00ZhomeKitchenhum36.1

Group key instance = [_measurement=home, room=Living Room, _field=hum]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeLiving Roomhum35.9
2022-01-01T09:00:00ZhomeLiving Roomhum35.9
2022-01-01T10:00:00ZhomeLiving Roomhum36

Group key instance = [_measurement=home, room=Kitchen, _field=temp]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchentemp21
2022-01-01T09:00:00ZhomeKitchentemp23
2022-01-01T10:00:00ZhomeKitchentemp22.7

Group key instance = [_measurement=home, room=Living Room, _field=temp]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeLiving Roomtemp21.1
2022-01-01T09:00:00ZhomeLiving Roomtemp21.4
2022-01-01T10:00:00ZhomeLiving Roomtemp21.8

When grouped by _field, all rows with the temp field will be in one table and all the rows with the hum field will be in another. _measurement and room columns no longer affect how rows are grouped.

_start and _stop columns have been omitted.

Group key instance = [_field=hum]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchenhum35.9
2022-01-01T09:00:00ZhomeKitchenhum36.2
2022-01-01T10:00:00ZhomeKitchenhum36.1
2022-01-01T08:00:00ZhomeLiving Roomhum35.9
2022-01-01T09:00:00ZhomeLiving Roomhum35.9
2022-01-01T10:00:00ZhomeLiving Roomhum36

Group key instance = [_field=temp]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchentemp21
2022-01-01T09:00:00ZhomeKitchentemp23
2022-01-01T10:00:00ZhomeKitchentemp22.7
2022-01-01T08:00:00ZhomeLiving Roomtemp21.1
2022-01-01T09:00:00ZhomeLiving Roomtemp21.4
2022-01-01T10:00:00ZhomeLiving Roomtemp21.8

Ungroup data

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp and hum fields.
  2. Use group() without any parameters to “ungroup” data or group by no columns. The default value of the columns parameter is an empty array ([]).
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp" or r._field == "hum")
  5. |> group()

Input Output Click to view output

The following data is output from the last filter() and piped forward into group():

_start and _stop columns have been omitted.

Group key instance = [_measurement=home, room=Kitchen, _field=hum]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchenhum35.9
2022-01-01T09:00:00ZhomeKitchenhum36.2
2022-01-01T10:00:00ZhomeKitchenhum36.1

Group key instance = [_measurement=home, room=Living Room, _field=hum]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeLiving Roomhum35.9
2022-01-01T09:00:00ZhomeLiving Roomhum35.9
2022-01-01T10:00:00ZhomeLiving Roomhum36

Group key instance = [_measurement=home, room=Kitchen, _field=temp]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchentemp21
2022-01-01T09:00:00ZhomeKitchentemp23
2022-01-01T10:00:00ZhomeKitchentemp22.7

Group key instance = [_measurement=home, room=Living Room, _field=temp]

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeLiving Roomtemp21.1
2022-01-01T09:00:00ZhomeLiving Roomtemp21.4
2022-01-01T10:00:00ZhomeLiving Roomtemp21.8

When ungrouped, a data is returned in a single table.

_start and _stop columns have been omitted.

Group key instance = []

_time_measurementroom_field_value
2022-01-01T08:00:00ZhomeKitchenhum35.9
2022-01-01T09:00:00ZhomeKitchenhum36.2
2022-01-01T10:00:00ZhomeKitchenhum36.1
2022-01-01T08:00:00ZhomeKitchentemp21
2022-01-01T09:00:00ZhomeKitchentemp23
2022-01-01T10:00:00ZhomeKitchentemp22.7
2022-01-01T08:00:00ZhomeLiving Roomhum35.9
2022-01-01T09:00:00ZhomeLiving Roomhum35.9
2022-01-01T10:00:00ZhomeLiving Roomhum36
2022-01-01T08:00:00ZhomeLiving Roomtemp21.1
2022-01-01T09:00:00ZhomeLiving Roomtemp21.4
2022-01-01T10:00:00ZhomeLiving Roomtemp21.8

Aggregate or select specific data

Use Flux aggregate or selector functions to return aggregate or selected values from each input table.

  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
  5. |> mean()

Aggregate over time

If you want to query aggregate values over time, this is a form of downsampling.

Aggregate functions

Aggregate functions drop columns that are not in the group key and return a single row for each input table with the aggregate value of that table.

Aggregate examples

Calculate the average temperature for each room

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field. By default, from() returns the data grouped by _measurement, room and _field, so each table represents a room.
  2. Use mean() to return the average temperature from each room.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> mean()

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_measurementroom_field_value
homeKitchentemp22.814285714285713
_measurementroom_field_value
homeLiving Roomtemp22.44285714285714

Calculate the overall average temperature of all rooms

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field.
  2. Use group() to ungroup the data into a single table. By default, from() returns the data grouped by_measurement, room and _field. To get the overall average, you need to structure all results as a single table.
  3. Use mean() to return the average temperature.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> group()
  6. |> mean()

Input Output Click to view output

The following input data represents the ungrouped data that is piped forward into mean().

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_value
22.628571428571426

Count the number of points reported per room across all fields

Using the data written in “Get started writing to InfluxDB”:

  1. Query all fields by simply filtering by the home measurement.
  2. The fields in the home measurement are different types. Use toFloat() to cast all field values to floats.
  3. Use group() to group the data by room.
  4. Use count() to return the number of rows in each input table.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> toFloat()
  5. |> group(columns: ["room"])
  6. |> count()
Output

_start and _stop columns have been omitted.

room_value
Kitchen21
room_value
Living Room21

Assign a new aggregate timestamp

_time is generally not part of the group key and will be dropped when using aggregate functions. To assign a new timestamp to aggregate points, duplicate the _start or _stop column, which represent the query bounds, as the new _time column.

  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> mean()
  6. |> duplicate(column: "_stop", as: "_time")

Selector functions

Selector functions return one or more columns from each input table and retain all columns and their values.

Selector examples

Return the first temperature from each room

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field.
  2. Use first() to return the first row from each table.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> first()

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3

Return the last temperature from each room

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field.
  2. Use last() to return the last row from each table.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> last()

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

Return the maximum temperature from each room

Using the data written in “Get started writing to InfluxDB”:

  1. Query the temp field.
  2. Use max() to return the row with the highest value in the _value column from each table.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> max()

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
_time_measurementroom_field_value
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8

Pivot data into a relational schema

If coming from relational SQL or SQL-like query languages, such as InfluxQL, the data model that Flux uses is different than what you’re used to. Flux returns multiple tables where each table contains a different field. A “relational” schema structures each field as a column in each row.

Use the pivot() function to pivot data into a “relational” schema based on timestamps.

  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
  5. |> filter(fn: (r) => r.room == "Kitchen")
  6. |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

View input and pivoted output

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchenco1
2022-01-01T15:00:00ZhomeKitchenco3
2022-01-01T16:00:00ZhomeKitchenco7
2022-01-01T17:00:00ZhomeKitchenco9
2022-01-01T18:00:00ZhomeKitchenco18
2022-01-01T19:00:00ZhomeKitchenco22
2022-01-01T20:00:00ZhomeKitchenco26
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchenhum36.3
2022-01-01T15:00:00ZhomeKitchenhum36.2
2022-01-01T16:00:00ZhomeKitchenhum36
2022-01-01T17:00:00ZhomeKitchenhum36
2022-01-01T18:00:00ZhomeKitchenhum36.9
2022-01-01T19:00:00ZhomeKitchenhum36.6
2022-01-01T20:00:00ZhomeKitchenhum36.5
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7

_start and _stop columns have been omitted.

_time_measurementroomcohumtemp
2022-01-01T14:00:00ZhomeKitchen136.322.8
2022-01-01T15:00:00ZhomeKitchen336.222.7
2022-01-01T16:00:00ZhomeKitchen73622.4
2022-01-01T17:00:00ZhomeKitchen93622.7
2022-01-01T18:00:00ZhomeKitchen1836.923.3
2022-01-01T19:00:00ZhomeKitchen2236.623.1
2022-01-01T20:00:00ZhomeKitchen2636.522.7

Downsample data

Downsampling data is a strategy that improve performance at query time and also optimizes long-term data storage. Simply put, downsampling reduces the number of points returned by a query without losing the general trends in the data.

For more information about downsampling data, see Downsample data.

The most common way to downsample data is by time intervals or “windows.” For example, you may want to query the last hour of data and return the average value for every five minute window.

Use aggregateWindow() to downsample data by specified time intervals:

  • Use the every parameter to specify the duration of each window.
  • Use the fn parameter to specify what aggregate or selector function to apply to each window.
  • (Optional) Use the timeSrc parameter to specify which column value to use to create the new aggregate timestamp for each window. The default is _stop.
  1. from(bucket: "get-started")
  2. |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
  3. |> filter(fn: (r) => r._measurement == "home")
  4. |> filter(fn: (r) => r._field == "temp")
  5. |> aggregateWindow(every: 2h, fn: mean)

View input and downsampled output

Input Output Click to view output

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeKitchentemp22.8
2022-01-01T15:00:00ZhomeKitchentemp22.7
2022-01-01T16:00:00ZhomeKitchentemp22.4
2022-01-01T17:00:00ZhomeKitchentemp22.7
2022-01-01T18:00:00ZhomeKitchentemp23.3
2022-01-01T19:00:00ZhomeKitchentemp23.1
2022-01-01T20:00:00ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T14:00:00ZhomeLiving Roomtemp22.3
2022-01-01T15:00:00ZhomeLiving Roomtemp22.3
2022-01-01T16:00:00ZhomeLiving Roomtemp22.4
2022-01-01T17:00:00ZhomeLiving Roomtemp22.6
2022-01-01T18:00:00ZhomeLiving Roomtemp22.8
2022-01-01T19:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.2

_start and _stop columns have been omitted.

_time_measurementroom_field_value
2022-01-01T16:00:00ZhomeKitchentemp22.75
2022-01-01T18:00:00ZhomeKitchentemp22.549999999999997
2022-01-01T20:00:00ZhomeKitchentemp23.200000000000003
2022-01-01T20:00:01ZhomeKitchentemp22.7
_time_measurementroom_field_value
2022-01-01T16:00:00ZhomeLiving Roomtemp22.3
2022-01-01T18:00:00ZhomeLiving Roomtemp22.5
2022-01-01T20:00:00ZhomeLiving Roomtemp22.65
2022-01-01T20:00:01ZhomeLiving Roomtemp22.2

Automate processing with InfluxDB tasks

InfluxDB tasks are scheduled queries that can perform any of the data processing operations described above. Generally tasks then use the to() function to write the processed result back to InfluxDB.

For more information about creating and configuring tasks, see Get started with InfluxDB tasks.

Example downsampling task

  1. option task = {
  2. name: "Example task"
  3. every: 1d,
  4. }
  5. from(bucket: "get-started-downsampled")
  6. |> range(start: -task.every)
  7. |> filter(fn: (r) => r._measurement == "home")
  8. |> aggregateWindow(every: 2h, fn: mean)

Query data Visualize data