编写查询语句

本章节描述如何在 GreptimeDB 中编写持续聚合查询。 查询语句应该是一个带有聚合函数或非聚合函数(即 scalar 函数)的 SELECT 语句。

一般来说,Flow 的 SQL 部分就像一个普通的 SELECT 子句,只是稍微有一些不同。 具体语法如下:

  1. SELECT AGGR_FUNCTION(column1, column2,..) FROM <source_table> GROUP BY TIME_WINDOW_FUNCTION();

SELECT 关键字后只允许两种表达式:

  • 聚合函数:详细信息请参考 表达式 部分。
  • 标量函数:如 colto_lowercase(col)col + 1 等。这部分与 GreptimeDB 中的普通 SELECT 子句相同。

查询应该有一个 FROM 子句来标识 source 表。由于不支持 join 子句,目前只能从单个表中聚合列。

GROUP BY 子句与普通查询中的工作方式相同。 它根据指定的列对数据进行分组。 GROUP BY 子句中使用的时间窗口函数 hop()tumble()定义时间窗口 部分中有描述。 它们用于在聚合中定义时间窗口。 GROUP BY 中的其他表达式可以是 literal、列名或 scalar 表达式。

持续聚合查询不支持 ORDER BYLIMITOFFSET 等其他操作。

将现有查询重写为持续聚合查询

一些简单的现有聚合查询可以直接用作持续聚合查询。例如,概述 部分中的示例可以用于标准 SQL 查询和持续聚合查询,因为它也是一个有效的 SQL 查询,没有任何特定于流的语法或函数:

  1. SELECT
  2. status,
  3. count(client) AS total_logs,
  4. min(size) as min_size,
  5. max(size) as max_size,
  6. avg(size) as avg_size,
  7. sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
  8. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  9. FROM ngx_access_log
  10. GROUP BY
  11. status,
  12. time_window;

然而,还有其他类型的查询不能直接用作持续聚合查询。 例如,需要计算百分位数的查询不应该在每次新数据到达时重复计算每个时间窗口的百分位数。 在这种情况下,您可以将数据预聚合到所需大小的桶中,然后在需要时使用标准 SQL 在 sink 表中计算百分位数。原始 SQL 可能是:

  1. SELECT
  2. status,
  3. percentile_approx(size, 0.5) as median_size,
  4. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  5. FROM ngx_access_log
  6. GROUP BY
  7. status,
  8. time_window;

上述查询可以重写为首先将数据聚合到大小为 10 的桶中,然后在 sink 表中计算百分位数。 流查询将是:

  1. CREATE FLOW calc_ngx_distribution
  2. SINK TO ngx_distribution
  3. AS
  4. SELECT
  5. status,
  6. trunc(size, -1) as bucket,
  7. count(client) AS total_logs,
  8. date_bin(INTERVAL '1 minutes', access_time) as time_window,
  9. FROM ngx_access_log
  10. GROUP BY
  11. status,
  12. time_window,
  13. bucket;

接下来,您可以使用标准 SQL 在 sink 表中计算百分位数:

  1. SELECT
  2. outer.status,
  3. outer.time_window,
  4. outer.bucket,
  5. SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile
  6. FROM ngx_distribution AS outer
  7. JOIN ngx_distribution AS in1
  8. ON in1.status = outer.status
  9. AND in1.time_window = outer.time_window
  10. GROUP BY
  11. status,
  12. time_window,
  13. bucket
  14. ORDER BY status, time_window, bucket;

上述 SQL 查询按 status、time_window 和 bucket 对数据进行分组。percentile 列通过计算小于或等于当前 bucket 的所有 bucket 的总和,并将其除以所有日志的总数来计算每个组内的百分比。结果可能如下所示:

  1. status | time_window | bucket | percentile
  2. --------+----------------------------+--------+------------
  3. 404 | 1970-01-01 00:00:00.000000 | 0 | 22
  4. 404 | 1970-01-01 00:00:00.000000 | 1 | 55
  5. 404 | 1970-01-01 00:00:00.000000 | 2 | 66
  6. 404 | 1970-01-01 00:00:00.000000 | 3 | 100
  7. (4 rows)