Write a Query

This chapter describes how to write a continuous aggregation query in GreptimeDB. Query here should be a SELECT statement with either aggregating functions or non-aggregating functions (i.e., scalar function).

Generally speaking, the SQL part in the flow is just like a normal SELECT clause with a few difference. The grammar of the query is like the following:

  1. SELECT AGGR_FUNCTION(column1, column2,..) FROM <source_table> GROUP BY TIME_WINDOW_FUNCTION();

Only two kinds of expression are allowed after SELECT keyword:

  • Aggregate functions: see the reference in Expression for detail.
  • Scalar functions: like col, to_lowercase(col), col + 1, etc. This part is the same as the normal SELECT clause in GreptimeDB.

The query should have a FROM clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table.

GROUP BY clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions hop() and tumble() described in Define Time Window part. They are used in the GROUP BY clause to define the time window for the aggregation. Other expressions in GROUP BY can be either literal, column or scalar expressions.

Others things like ORDER BY, LIMIT, OFFSET are not supported in the continuous aggregation query.

Rewrite an existing query to a continuous aggregation query

Some of simple existing aggregation queries can be directly used as continuous aggregation queries. For example, the example in overview can be used to query both in standard SQL and continuous aggregation query, since it’s also a valid SQL query without any flow-specific syntax or functions:

  1. SELECT
  2. status,
  3. count(client) AS total_logs,
  4. min(size) as min_size,
  5. max(size) as max_size,
  6. avg(size) as avg_size,
  7. sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
  8. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  9. FROM ngx_access_log
  10. GROUP BY
  11. status,
  12. time_window;

However, there are other types of queries that cannot be directly used as continuous aggregation queries. For example, a query that needs to compute percentiles would be unwise to repeatedly calculate the percentile for each time window everytime a new batch of data arrive. In this case, you can pre-aggregate the data into buckets of the desired size, and then calculate the percentile in the sink table using standard SQL when needed. The original SQL might be:

  1. SELECT
  2. status,
  3. percentile_approx(size, 0.5) as median_size,
  4. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  5. FROM ngx_access_log
  6. GROUP BY
  7. status,
  8. time_window;

The above query can be rewritten to first aggregate the data into buckets of size 10, and then calculate the percentile in the sink table. The flow query would be:

  1. CREATE FLOW calc_ngx_distribution
  2. SINK TO ngx_distribution
  3. AS
  4. SELECT
  5. status,
  6. trunc(size, -1) as bucket,
  7. count(client) AS total_logs,
  8. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  9. FROM ngx_access_log
  10. GROUP BY
  11. status,
  12. time_window,
  13. bucket;

And then you can calculate the percentile in the sink table using standard SQL:

  1. SELECT
  2. outer.status,
  3. outer.time_window,
  4. outer.bucket,
  5. SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile
  6. FROM ngx_distribution AS outer
  7. JOIN ngx_distribution AS in1
  8. ON in1.status = outer.status
  9. AND in1.time_window = outer.time_window
  10. GROUP BY
  11. status,
  12. time_window,
  13. bucket
  14. ORDER BY status, time_window, bucket;

The SQL query groups the data by status, time_window, and bucket. The percentile column calculates the percentage within each group by taking the sum of all buckets not greater than the current bucket and dividing it by the total count of all logs. The result would be something like this:

  1. status | time_window | bucket | percentile
  2. --------+----------------------------+--------+------------
  3. 404 | 1970-01-01 00:00:00.000000 | 0 | 22
  4. 404 | 1970-01-01 00:00:00.000000 | 1 | 55
  5. 404 | 1970-01-01 00:00:00.000000 | 2 | 66
  6. 404 | 1970-01-01 00:00:00.000000 | 3 | 100
  7. (4 rows)