Continuous Aggregates

Aggregate queries which touch large swathes of time-series data can take a long time to compute because the system needs to scan large amounts of data on every query execution. To make such queries faster, a continuous aggregate allows materializing the computed aggregates, while also providing means to continuously, and with low overhead, keep them up-to-date as the underlying source data changes.

Continuous aggregates are somewhat similar to PostgreSQL’s materialized views, but, unlike a materialized view, a continuous aggregate can be continuously and incrementally refreshed. The refreshing can be done either manually or via a policy that runs in the background, and can cover the entire continuous aggregate or just a specific time range. In either case, the refresh only recomputes the aggregate buckets that have changed since the last refresh.

An introductory example

As a quick introductory example, let’s create a hypertable conditions containing temperature data for devices and a continuous aggregate to compute the daily average, minimum, and maximum temperature. Start off by creating the hypertable and populate it with some data:

  1. CREATE TABLE conditions (
  2. time TIMESTAMPTZ NOT NULL,
  3. device INTEGER NOT NULL,
  4. temperature FLOAT NOT NULL,
  5. PRIMARY KEY(time, device)
  6. );
  7. SELECT * FROM create_hypertable('conditions', 'time', 'device', 3);
  8. INSERT INTO conditions
  9. SELECT time, (random()*30)::int, random()*80 - 40
  10. FROM generate_series(TIMESTAMP '2020-01-01 00:00:00',
  11. TIMESTAMP '2020-06-01 00:00:00',
  12. INTERVAL '10 min') AS time;

You can then create a continuous aggregate view to compute the daily average, minimum, and maximum temperature:

  1. CREATE MATERIALIZED VIEW conditions_summary_hourly
  2. WITH (timescaledb.continuous) AS
  3. SELECT device,
  4. time_bucket(INTERVAL '1 hour', time) AS bucket,
  5. AVG(temperature),
  6. MAX(temperature),
  7. MIN(temperature)
  8. FROM conditions
  9. GROUP BY device, bucket;

Lastly, you should add a policy to ensure that the continuous aggregate is refreshed on a regular basis.

  1. SELECT add_continuous_aggregate_policy('conditions_summary_hourly',
  2. start_offset => INTERVAL '1 month',
  3. end_offset => INTERVAL '1 h',
  4. schedule_interval => INTERVAL '5 min');

In this case, the continuous aggregate will be refreshed every hour and refresh the last month’s data.

You can now run a normal SELECT on the continuous aggregate and it will give you the aggregated data, for example, to select the daily averages for device 1 during the first three months:

  1. SELECT bucket, avg
  2. FROM conditions_summary_hourly
  3. WHERE device = 1 AND bucket BETWEEN '2020-01-01' AND '2020-03-31'
  4. ORDER BY bucket;

A detailed look at continuous aggregates

As shown above, creating a refreshing continuous aggregate is a two-step process. First, one needs to create a continuous aggregate view of the data using CREATE MATERIALIZED VIEW with the timescaledb.continuous option. Second, a continuous aggregate policy needs to be created to keep it refreshed.

You can create several continuous aggregates for the same hypertable. For example, you could create another continuous aggregate view for daily data.

  1. CREATE MATERIALIZED VIEW conditions_summary_daily
  2. WITH (timescaledb.continuous) AS
  3. SELECT device,
  4. time_bucket(INTERVAL '1 day', time) AS bucket,
  5. AVG(temperature),
  6. MAX(temperature),
  7. MIN(temperature)
  8. FROM conditions
  9. GROUP BY device, bucket;
  10. -- Create the policy
  11. SELECT add_continuous_aggregate_policy('conditions_summary_daily',
  12. start_offset => INTERVAL '1 month',
  13. end_offset => INTERVAL '1 day',
  14. schedule_interval => INTERVAL '1 hour');

A time_bucket on the time partitioning column of the hypertable is required in all continuous aggregate views. If you do not provide one, you will get an error.

When the view is created, it will (by default) be populated with data so that it contains the aggregates computed across the entire conditions hypertable.

It might, however, not always be desirable to populate the continuous aggregate when created. If the amount of data in conditions is large and new data is continuously being added, it is often more useful to control the order in which the data is refreshed by combining manual refresh with a policy. For example, one could use a policy to refresh only recent (and future) data while historical data is left to manual refreshes. In those cases, the WITH NO DATA option can be used to avoid aggregating all the data during creation.

The refresh_continuous_aggregate command is used for manual refreshing. For example, to refresh one month of data you could write:

  1. CALL refresh_continuous_aggregate('conditions_summary_hourly', '2020-05-01', '2020-06-01');

Unlike a regular materialized view, the refresh command will only recompute the data within the window that has changed in the underlying hypertable since the last refresh. Therefore, if only a few buckets need updating, then the refresh is quick.

Note that the end range is exclusive and aligned to the buckets of the continuous aggregate, so this will refresh only the buckets that are fully in the date range ['2020-05-01', '2020-06-01'), that is, up to but not including 2020-06-01. While it is possible to use NULL to indicate an open-ended range, we do not in general recommend using it. Such a refresh might materialize a lot of data, have a negative affect on performance, and can affect other policies such as data retention. For more information, see the Advanced Usage section below.

Continuous aggregates are supported for most aggregate functions that can be parallelized by PostgreSQL, which includes the normal aggregates like SUM and AVG. However, aggregates using ORDER BY and DISTINCT cannot be used with continuous aggregates since they are not possible to parallelize by PostgreSQL. In addition, TimescaleDB continuous aggregates do not currently support the FILTER clause (not to be confused with WHERE) even though it is possible to parallelize but we might add support for this in a future version.

Automatic refresh with a continuous aggregate policy

Continuous aggregate policies can be configured to support different use cases. For example, you might want to:

  • have the continuous aggregate and the hypertable be in sync, even when data is removed from the hypertable, or
  • keep the aggregate data in the continuous aggregate when removing source data from the hypertable.

These use cases are supported by different configuration to add_continuous_aggregate_policy.

This function takes three arguments:

  • The parameter start_offset indicates the start of the refresh window relative to the current time when the policy executes.
  • The parameter end_offset indicates the end of the refresh window relative to the current time when the policy executes.
  • The parameter schedule_interval indicates the refresh interval in wall-clock time.

Similar to the refresh_continuous_aggregate function, providing NULL to start_offset or end_offset makes the range open-ended and will extend to the beginning or end of time, respectively. However, it seldom makes sense to use NULL for the end_offset. Instead, it is recommended to set the end_offset so that at least the most recent time bucket is excluded. For time-series data that see mostly in-order writes, the time buckets that still see lots of writes will quickly have out-of-date aggregates. Excluding those time buckets will provide better performance.

For example, to create a policy for conditions_summary_hourly that keeps the continuous aggregate up to date with the underlying hypertable conditions and runs every hour, you would write:

  1. SELECT add_continuous_aggregate_policy('conditions_summary_hourly',
  2. start_offset => NULL,
  3. end_offset => INTERVAL '1 h',
  4. schedule_interval => INTERVAL '1 h');

This will ensure that all data in the continuous aggregate is up to date with the hypertable except the last hour and also ensure that we do not try to refresh the last bucket of the continuous aggregate. Since we give an open-ended start_offset, any data that is removed from conditions (for example, by using DELETE or drop_chunks) will also be removed from conditions_summary_hourly. In effect, the continuous aggregate will always reflect the data in the underlying hypertable.

If you instead want to keep data in the continuous aggregate even if the source data is removed from the underlying hypertable, you also need to set the start_offset in way that is compatible with the data retention policy on the source hypertable. For example, if you have a retention policy that removes data older than one month, you need to set start_offset to one month (or less) and thereby not refresh the region of dropped data.

  1. SELECT add_continuous_aggregate_policy('conditions_summary_hourly',
  2. start_offset => INTERVAL '1 month',
  3. end_offset => INTERVAL '1 h',
  4. schedule_interval => INTERVAL '1 h');

WARNING:It is important to consider data retention policies when setting up continuous aggregate policies. If the continuous aggregate policy window covers data that is removed by the data retention policy, the aggregates for those buckets will be refreshed and consequently the data will be removed. For example, if you have a data retention policy that will remove all data older than 2 weeks, the continuous aggregate policy above will only have data for the last two weeks. A more reasonable data retention policy for this case would then be to remove data that is older than 1 month.

You can read more about data retention with continuous aggregates in the Data retention section.

A continuous aggregate may be dropped by using the DROP MATERIALIZED VIEW command. It does not affect the data in the hypertable from which the continuous aggregate is derived (conditions in the example above).

  1. DROP MATERIALIZED VIEW conditions_summary_hourly;

Querying Continuous Aggregates

To query data from a continuous aggregate, use a SELECT query on the continuous aggregate view. For instance, you can get the average, minimum, and maximum for the first quarter of 2020 for device 5:

  1. SELECT * FROM conditions_summary_hourly
  2. WHERE device = 5
  3. AND bucket >= '2020-01-01' AND bucket < '2020-04-01';

Or we can do more complex queries on the aggregates themselves, for instance, if we wanted to know the top 20 largest metric spreads in that quarter, we could do something like:

  1. SELECT * FROM conditions_summary_hourly
  2. WHERE max - min > 1800
  3. AND bucket >= '2020-01-01' AND bucket < '2020-04-01'
  4. ORDER BY bucket DESC, device_id DESC LIMIT 20;

Real-Time Aggregation

A query on a continuous aggregate will, by default, use real-time aggregation (first introduced in TimescaleDB 1.7) to combine materialized aggregates with recent data from the source hypertable. By combining raw and materialized data in this way, real-time aggregation produces accurate and up-to-date results while still benefiting from pre-computed aggregates for a large portion of the result.

Real-time aggregation is the default behavior for any new continuous aggregates. To disable real-time aggregation and show only materialized data, add the parameter timescaledb.materialized_only=true when creating the continuous aggregate view or set it on an existing continuous aggregate using ALTER MATERIALIZED VIEW.

TIP:To use real-time aggregation on a continuous aggregate created in a version earlier than TimescaleDB 1.7, alter the view to set timescaledb.materialized_only=false.


Advanced Topics

Refreshing continuous aggregates

When using refresh_continuous_aggregate it is possible to use NULL to indicate an open-ended range either for the start of the window or the end of the window. For example, to refresh the complete range of a continuous aggregate, write:

  1. CALL refresh_continuous_aggregate('conditions_summary_hourly, NULL, NULL);

However, we do not recommend open-ended refreshes on continuous aggregates when there is a continuous ingest of new data since that would trigger a refresh of time buckets that are not yet completely filled. It might also materialize a lot of data, increase write amplification, and affect other policies such as data retention.

TIP:You should avoid refreshing time intervals that still see a lot of writes, which is usually the last bucket of the continuous aggregate. These intervals are still changing and are unlikely to produce accurate aggregates, while at the same time slowing down the ingest rate of the hypertable due to write amplification. If you want to include the latest bucket in your queries, you should instead rely on [real-time aggregation][real-time-aggregates].

The schedule_interval option to add_continuous_aggregate_policy controls how frequently materialization jobs will be run. Setting a shorter interval will refresh more frequently but at the same time consume more background worker resources.

Using timescaledb.information Views

The various options used to create the continuous aggregate view, as well as its definition, can be found in the timescaledb_information.continuous_aggregates view, and information about the state and progress of the materialization background worker jobs can be found in the timescaledb_information.job_stats view. These views can be quite useful for administering continuous aggregates and tuning other options noted below.

Dropping Data with Continuous Aggregates Enabled

Note that if any still-refreshing (more recent than start_offset) part of the continuous aggregate is dropped via a retention policy or direct drop_chunks call, the aggregate will be updated to reflect the loss of data. For this reason, if it is desired to retain the continuous aggregate after dropping the underlying data, the start_offset of the aggregate policy must be set to a smaller interval than the drop_after parameter of a hypertable’s retention policy. Similiarly, when calling drop_chunks, extra care should also be taken to ensure that any such chunks are not within the refresh window of a continuous aggregate that still needs the data. More detail and examples of this can be seen in the the data retention documentation.

This is also a consideration when manually refreshing a continuous aggregate. Calling refresh_continuous_aggregate on a region containing dropped chunks will recalculate the aggregate without the dropped data. This can lead to undesirable results, such as replacing previous aggregate data with NULL values, given that the raw data has subsequently been dropped.

Continuous Aggregates using Integer-Based Time

Usually, continuous aggregates are defined on a date/time-type column, but it is also possible to create your own custom scheme for handling aggregation for tables that are using an integer time column. This can be useful if you have tables that use other measures of time that can be represented as integer values, such as nanosecond epochs, minutes since founding date, or whatever is suitable for your application.

As an example, suppose that you have a table with CPU and disk usage for some devices where time is measured in microfortnights (a microfortnight is a little more than a second). Since you are using an integer-valued column as time, you need to provide the chunk time interval when creating the hypertable. In this case, let each chunk consist of a millifortnight (a 1000 microfortnights, which is about 20 minutes).

  1. CREATE TABLE devices(
  2. time BIGINT, -- Time in microfortnights since epoch
  3. cpu_usage INTEGER, -- Total CPU usage
  4. disk_usage INTEGER, -- Total disk usage
  5. PRIMARY KEY (time)
  6. );
  7. SELECT create_hypertable('devices', 'time',
  8. chunk_time_interval => 1000);

To define a continuous aggregate on a hypertable that is using an integer time dimension, it is necessary to have a function to get the current time in whatever representation that you are using and set it for the hypertable using set_integer_now_func. The function can be defined as a normal PostgreSQL function, but needs to be STABLE, take no arguments, and return an integer value of the same type as the time column in the table. In our case, this should suffice:

  1. CREATE FUNCTION current_microfortnight() RETURNS BIGINT
  2. LANGUAGE SQL STABLE AS $$
  3. SELECT CAST(1209600 * EXTRACT(EPOCH FROM CURRENT_TIME) / 1000000 AS BIGINT)
  4. $$;
  5. SELECT set_integer_now_func('devices', 'current_microfortnight');

Once the replacement for current time has been set up, you can define a continuous aggregate for the devices table.

  1. CREATE MATERIALIZED VIEW devices_summary
  2. WITH (timescaledb.continuous) AS
  3. SELECT time_bucket('500', time) AS bucket,
  4. avg(cpu_usage) AS avg_cpu,
  5. avg(disk_usage) AS avg_disk
  6. FROM devices
  7. GROUP BY bucket;

You can now insert some rows to check if the aggregation works as expected.

  1. CREATE EXTENSION tablefunc;
  2. INSERT INTO devices(time, cpu_usage, disk_usage)
  3. SELECT time,
  4. normal_rand(1,70,10) AS cpu_usage,
  5. normal_rand(1,2,1) * (row_number() over()) AS disk_usage
  6. FROM generate_series(1,10000) AS time;

TIP:You can use the tablefunc extension to generate a normal distribution and use the row_number function to turn it into a cumulative sequence.

You can now check that the view contains the correct data.

  1. postgres=# SELECT * FROM devices_summary ORDER BY bucket LIMIT 10;
  2. bucket | avg_cpu | avg_disk
  3. --------+---------------------+----------------------
  4. 0 | 63.0000000000000000 | 6.0000000000000000
  5. 5 | 69.8000000000000000 | 9.6000000000000000
  6. 10 | 70.8000000000000000 | 24.0000000000000000
  7. 15 | 75.8000000000000000 | 37.6000000000000000
  8. 20 | 71.6000000000000000 | 26.8000000000000000
  9. 25 | 67.6000000000000000 | 56.0000000000000000
  10. 30 | 68.8000000000000000 | 90.2000000000000000
  11. 35 | 71.6000000000000000 | 88.8000000000000000
  12. 40 | 66.4000000000000000 | 81.2000000000000000
  13. 45 | 68.2000000000000000 | 106.0000000000000000
  14. (10 rows)

Best Practices

Modifying the Materialization Hypertable: Advanced users may find the need to modify certain properties of the materialization hypertable (e.g. chunk size) or to create further indexes. To help with such, we can find the name of the materialization hypertable in the timescaledb_information.continuous_aggregates view (API Docs). We can then modify the materialization hypertable as if it were a normal hypertable. For instance, we may want to set the materialization hypertable’s chunk_time_interval to something other than the default; this can be accomplished by running set_chunk_time_interval on the materialization hypertable.

Creating Indexes on the Materialization Hypertable: By default, the database will automatically create composite indexes on each column specified in the GROUP BY combined with the time_bucket column (i.e., in our example, because the continuous aggregate view is defined as GROUP BY device, bucket, we would automatically create a composite index on {device, bucket}. If we had additionally grouped by additional columns (e.g., GROUP BY device, foo, bar, bucket), we would create additional indexes as well ({foo, bucket} and {bar, bucket}). Setting timescaledb.create_group_indexes to false when creating the view will prevent this. If we want to create additional indexes or drop some of the default ones, we can do so by creating or dropping the appropriate indexes on the materialization hypertable directly.

TIP:You can find the names of all the materialized hypertables by querying timescaledb_information.continuous_aggregates.

  1. SELECT view_name, materialization_hypertable
  2. FROM timescaledb_information.continuous_aggregates;
  3. view_name | materialization_hypertable
  4. ---------------------------+---------------------------------------------------
  5. conditions_summary_hourly | _timescaledb_internal._materialized_hypertable_30
  6. conditions_summary_daily | _timescaledb_internal._materialized_hypertable_31
  7. (2 rows)

Choosing an appropriate bucket interval: The materialisation of the continuous aggregates stores partials, which are then used to calculate the final aggregations at query time. This means that there is a base amount of overhead for any query, which becomes a greater factor for smaller intervals. For smaller intervals, it can be more performant to run an aggregate query on the raw data in the hypertable, so test both methods to determine what is best for your data set and desired bucket interval.

Dealing with Timezones: Functions that depend on a local timezone setting inside a continuous aggregate are not supported. We cannot cast to a local time because the timezone setting will change from user to user. So attempting to create a continuous aggregate like:

  1. CREATE MATERIALIZED VIEW device_summary
  2. WITH (timescaledb.continuous)
  3. AS
  4. SELECT
  5. time_bucket('1 hour', observation_time ) AS bucket,
  6. min(observation_time::timestamp) AS min_time,
  7. device_id,
  8. avg(metric) AS metric_avg,
  9. max(metric) - min(metric) AS metric_spread
  10. FROM
  11. device_readings
  12. GROUP BY bucket, device_id;

will fail.

Instead, we can use explicit timezones in our view definition like:

  1. CREATE MATERIALIZED VIEW device_summary
  2. WITH (timescaledb.continuous)
  3. AS
  4. SELECT
  5. time_bucket('1 hour', observation_time) AS bucket,
  6. min(observation_time AT TIME ZONE 'EST') AS min_time,
  7. device_id,
  8. avg(metric) AS metric_avg,
  9. max(metric) - min(metric) AS metric_spread
  10. FROM
  11. device_readings
  12. GROUP BY bucket, device_id;

Or we can cast to a timestamp on the way out of the view:

  1. SELECT min_time::timestamp FROM device_summary;