Overview

GreptimeDB provides a continuous aggregation feature that allows you to aggregate data in real-time. This feature is useful when you need to calculate and query the sum, average, or other aggregations on the fly. The continuous aggregation feature is provided by the Flow engine. It continuously updates the aggregated data based on the incoming data and materialize it. So you can think of it as a clever materialized views that know when to update result view table and how to update it with minimal effort. Some common use case include:

  • Downsampling the data point using i.e. average pooling to reduce amount of data for storage and analysis
  • Real-time analytics that provide actionable information in near real-time

When you insert data into the source table, the data is also sent to and stored in the Flow engine. The Flow engine calculate the aggregation by time windows and store the result in the sink table. The entire process is illustrated in the following image:

Continuous Aggregation

Quick start with an example

Here is a complete example of how a continuous aggregation query looks like.

This use case is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. First, create a source table ngx_access_log and a sink table ngx_statistics with following clauses:

  1. CREATE TABLE `ngx_access_log` (
  2. `client` STRING NULL,
  3. `ua_platform` STRING NULL,
  4. `referer` STRING NULL,
  5. `method` STRING NULL,
  6. `endpoint` STRING NULL,
  7. `trace_id` STRING NULL FULLTEXT,
  8. `protocol` STRING NULL,
  9. `status` SMALLINT UNSIGNED NULL,
  10. `size` DOUBLE NULL,
  11. `agent` STRING NULL,
  12. `access_time` TIMESTAMP(3) NOT NULL,
  13. TIME INDEX (`access_time`)
  14. )
  15. WITH(
  16. append_mode = 'true'
  17. );
  1. CREATE TABLE `ngx_statistics` (
  2. `status` SMALLINT UNSIGNED NULL,
  3. `total_logs` BIGINT NULL,
  4. `min_size` DOUBLE NULL,
  5. `max_size` DOUBLE NULL,
  6. `avg_size` DOUBLE NULL,
  7. `high_size_count` DOUBLE NULL,
  8. `time_window` TIMESTAMP time index,
  9. `update_at` TIMESTAMP NULL,
  10. PRIMARY KEY (`status`)
  11. );

Then create the flow ngx_aggregation to aggregate a series of aggregate functions, including count, min, max, avg of the size column, and the sum of all packets of size great than 550. The aggregation is calculated in 1-minute fixed windows of access_time column and also grouped by the status column. So you can be made aware in real time the information about packet size and action upon it, i.e. if the high_size_count became too high at a certain point, you can further examine if anything goes wrong, or if the max_size column suddenly spike in a 1 minute time window, you can then trying to locate that packet and further inspect it.

  1. CREATE FLOW ngx_aggregation
  2. SINK TO ngx_statistics
  3. AS
  4. SELECT
  5. status,
  6. count(client) AS total_logs,
  7. min(size) as min_size,
  8. max(size) as max_size,
  9. avg(size) as avg_size,
  10. sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
  11. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  12. FROM ngx_access_log
  13. GROUP BY
  14. status,
  15. time_window;

To observe the outcome of the continuous aggregation in the ngx_statistics table, insert some data into the source table ngx_access_log.

  1. INSERT INTO ngx_access_log
  2. VALUES
  3. ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 1000, "agent", "2021-07-01 00:00:01.000"),
  4. ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:00:30.500"),
  5. ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 600, "agent", "2021-07-01 00:01:01.000"),
  6. ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 700, "agent", "2021-07-01 00:01:01.500");

Then the sink table ngx_statistics will be incremental updated and contain the following data:

  1. SELECT * FROM ngx_statistics;
  1. status | total_logs | min_size | max_size | avg_size | high_size_count | time_window | update_at
  2. --------+------------+----------+----------+----------+-----------------+----------------------------+----------------------------
  3. 200 | 2 | 500 | 1000 | 750 | 1 | 2021-07-01 00:00:00.000000 | 2024-07-24 08:36:17.439000
  4. 200 | 1 | 600 | 600 | 600 | 1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:17.439000
  5. 404 | 1 | 700 | 700 | 700 | 1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:17.439000
  6. (3 rows)

Try to insert more data into the ngx_access_log table:

  1. INSERT INTO ngx_access_log
  2. VALUES
  3. ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:01:01.000"),
  4. ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 800, "agent", "2021-07-01 00:01:01.500");

The sink table ngx_statistics now have corresponding rows updated, notes how max_size, avg_size and high_size_count are updated:

  1. SELECT * FROM ngx_statistics;
  1. status | total_logs | min_size | max_size | avg_size | high_size_count | time_window | update_at
  2. --------+------------+----------+----------+----------+-----------------+----------------------------+----------------------------
  3. 200 | 2 | 500 | 1000 | 750 | 1 | 2021-07-01 00:00:00.000000 | 2024-07-24 08:36:17.439000
  4. 200 | 2 | 500 | 600 | 550 | 1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:46.495000
  5. 404 | 2 | 700 | 800 | 750 | 2 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:46.495000
  6. (3 rows)

Here is the explanation of the columns in the ngx_statistics table:

  • status: The status code of the HTTP response.
  • total_logs: The total number of logs with the same status code.
  • min_size: The minimum size of the packets with the same status code.
  • max_size: The maximum size of the packets with the same status code.
  • avg_size: The average size of the packets with the same status code.
  • high_size_count: The number of packets with the size greater than 550.
  • time_window: The time window of the aggregation.
  • update_at: The time when the aggregation is updated.

Next Steps

Congratulations you already have a preliminary understanding of the continuous aggregation feature. Please refer to the following sections to learn more:

  • Manage Flows describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow.
  • Write a Query describes how to write a continuous aggregation query.
  • Define Time Window describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation.
  • Expression is a reference of available expressions in the continuous aggregation query.