EXPLAIN 语句

EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。

执行 EXPLAIN 语句

Java

可以使用 TableEnvironmentexecuteSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

Scala

可以使用 TableEnvironmentexecuteSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

Python

可以使用 TableEnvironmentexecute_sql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,execute_sql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

SQL CLI

EXPLAIN 语句可以在 SQL CLI 中执行。

以下示例展示了如何在 SQL CLI 中执行一条 EXPLAIN 语句。

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  3. // 注册名为 “Orders” 的表
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  6. // 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
  7. String explanation =
  8. tEnv.explainSql(
  9. "SELECT `count`, COUNT(word) FROM ("
  10. + "MyTable1 WHERE word LIKE 'F%' "
  11. + "UNION ALL "
  12. + "SELECT `count`, word FROM MyTable2) tmp"
  13. + "GROUP BY `count`");
  14. System.out.println(explanation);
  15. // 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
  16. TableResult tableResult =
  17. tEnv.executeSql(
  18. "EXPLAIN PLAN FOR "
  19. + "SELECT `count`, COUNT(word) FROM ("
  20. + "MyTable1 WHERE word LIKE 'F%' "
  21. + "UNION ALL "
  22. + "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
  23. tableResult.print();
  24. TableResult tableResult2 =
  25. tEnv.executeSql(
  26. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
  27. + "SELECT `count`, COUNT(word) FROM ("
  28. + "MyTable1 WHERE word LIKE 'F%' "
  29. + "UNION ALL "
  30. + "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
  31. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tEnv = StreamTableEnvironment.create(env)
  3. // 注册名为 “Orders” 的表
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  6. // 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
  7. val explanation = tEnv.explainSql(
  8. """
  9. |SELECT `count`, COUNT(word)
  10. |FROM (
  11. | SELECT `count`, word FROM MyTable1
  12. | WHERE word LIKE 'F%'
  13. | UNION ALL
  14. | SELECT `count`, word FROM MyTable2 ) tmp
  15. |GROUP BY `count`
  16. |""".stripMargin)
  17. println(explanation)
  18. // 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
  19. val tableResult = tEnv.executeSql(
  20. """
  21. |EXPLAIN PLAN FOR
  22. |SELECT `count`, COUNT(word)
  23. |FROM (
  24. | SELECT `count`, word FROM MyTable1
  25. | WHERE word LIKE 'F%'
  26. | UNION ALL
  27. | SELECT `count`, word FROM MyTable2 ) tmp
  28. |GROUP BY `count`
  29. |""".stripMargin)
  30. tableResult.print()
  31. val tableResult2 = tEnv.executeSql(
  32. """
  33. |EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN
  34. |SELECT `count`, COUNT(word)
  35. |FROM (
  36. | SELECT `count`, word FROM MyTable1
  37. | WHERE word LIKE 'F%'
  38. | UNION ALL
  39. | SELECT `count`, word FROM MyTable2 ) tmp
  40. |GROUP BY `count`
  41. |""".stripMargin)
  42. tableResult2.print()

Python

  1. settings = EnvironmentSettings.new_instance()...
  2. table_env = StreamTableEnvironment.create(env, settings)
  3. t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  4. t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  5. # 调用 TableEnvironment.explain_sql() 来解释 SELECT 语句
  6. explanation1 = t_env.explain_sql(
  7. "SELECT `count`, COUNT(word) FROM ("
  8. "MyTable1 WHERE word LIKE 'F%' "
  9. "UNION ALL "
  10. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  11. print(explanation1)
  12. # 调用 TableEnvironment.execute_sql() 来解释 SELECT 语句
  13. table_result = t_env.execute_sql(
  14. "EXPLAIN PLAN FOR "
  15. "SELECT `count`, COUNT(word) FROM ("
  16. "MyTable1 WHERE word LIKE 'F%' "
  17. "UNION ALL "
  18. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  19. table_result.print()
  20. table_result2 = t_env.execute_sql(
  21. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
  22. "SELECT `count`, COUNT(word) FROM ("
  23. "MyTable1 WHERE word LIKE 'F%' "
  24. "UNION ALL "
  25. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  26. table_result2.print()

SQL CLI

  1. Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  4. [INFO] Table has been created.
  5. Flink SQL> EXPLAIN PLAN FOR SELECT `count`, COUNT(word) FROM
  6. > ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
  7. > UNION ALL
  8. > SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;
  9. Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN SELECT `count`, COUNT(word) FROM
  10. > ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
  11. > UNION ALL
  12. > SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;

EXPLAIN 的结果如下:

EXPLAIN PLAN

  1. == Abstract Syntax Tree ==
  2. LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
  3. +- LogicalUnion(all=[true])
  4. :- LogicalProject(count=[$0], word=[$1])
  5. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  6. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  7. +- LogicalProject(count=[$0], word=[$1])
  8. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  9. == Optimized Physical Plan ==
  10. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  11. +- Exchange(distribution=[hash[count]])
  12. +- Union(all=[true], union=[count, word])
  13. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
  14. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  15. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  16. == Optimized Execution Plan ==
  17. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  18. +- Exchange(distribution=[hash[count]])
  19. +- Union(all=[true], union=[count, word])
  20. :- Calc(select=[count, word], where=[LIKE(word, 'F%')])
  21. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  22. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

EXPLAIN PLAN WITH DETAILS

  1. == Abstract Syntax Tree ==
  2. LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
  3. +- LogicalUnion(all=[true])
  4. :- LogicalProject(count=[$0], word=[$1])
  5. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  6. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  7. +- LogicalProject(count=[$0], word=[$1])
  8. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  9. == Optimized Physical Plan With Advice ==
  10. GroupAggregate(advice=[1], groupBy=[count], select=[count, COUNT(word) AS EXPR$1], changelogMode=[I,UA]): rowcount = 1.05E8, cumulative cost = {5.2E8 rows, 1.805E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
  11. +- Exchange(distribution=[hash[count]], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {4.15E8 rows, 1.7945E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
  12. +- Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
  13. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  14. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  15. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  16. advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value).
  17. == Optimized Execution Plan ==
  18. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  19. +- Exchange(distribution=[hash[count]])
  20. +- Union(all=[true], union=[count, word])
  21. :- Calc(select=[count, word], where=[LIKE(word, 'F%')])
  22. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  23. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  24. == Physical Execution Plan ==
  25. {
  26. "nodes" : [ {
  27. "id" : 17,
  28. "type" : "Source: MyTable1[15]",
  29. "pact" : "Data Source",
  30. "contents" : "[15]:TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
  31. "parallelism" : 1
  32. }, {
  33. "id" : 18,
  34. "type" : "Calc[16]",
  35. "pact" : "Operator",
  36. "contents" : "[16]:Calc(select=[count, word], where=[LIKE(word, 'F%')])",
  37. "parallelism" : 1,
  38. "predecessors" : [ {
  39. "id" : 17,
  40. "ship_strategy" : "FORWARD",
  41. "side" : "second"
  42. } ]
  43. }, {
  44. "id" : 19,
  45. "type" : "Source: MyTable2[17]",
  46. "pact" : "Data Source",
  47. "contents" : "[17]:TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
  48. "parallelism" : 1
  49. }, {
  50. "id" : 22,
  51. "type" : "GroupAggregate[20]",
  52. "pact" : "Operator",
  53. "contents" : "[20]:GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])",
  54. "parallelism" : 1,
  55. "predecessors" : [ {
  56. "id" : 18,
  57. "ship_strategy" : "HASH",
  58. "side" : "second"
  59. }, {
  60. "id" : 19,
  61. "ship_strategy" : "HASH",
  62. "side" : "second"
  63. } ]
  64. } ]
  65. }

ExplainDetails

使用指定的 ExplainDetail 类型来打印语句的计划。

ESTIMATED_COST

指定 ESTIMATED_COST 将使得优化器(optimizer)将估算出的成本信息附加在每个物理节点上输出。

  1. == Optimized Physical Plan ==
  2. TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})

CHANGELOG_MODE

指定 CHANGELOG_MODE 将使得优化器(optimizer)将 changelog mode 附加在每个物理节点上输出。 关于 changelog mode 更多信息请参阅 动态表

  1. == Optimized Physical Plan ==
  2. GroupAggregate(..., changelogMode=[I,UA,D])

PLAN_ADVICE

从 Flink 1.17 版本开始支持 PLAN_ADVICE

指定 PLAN_ADVICE 将使得优化器(optimizer)分析优化后的物理执行计划并提供潜在的数据风险预警或性能调优建议。 此时输出标题将会从 “Optimized Physical Plan” 变为 “Optimized Physical Plan with Advice” 作为提示。

针对物理计划的建议按照 类型范围 来区分。

建议类型说明
WARNING给出潜在的数据正确性风险
ADVICE给出可能的 SQL 调优建议
建议范围说明
QUERY_LEVEL针对整个 SQL 的建议
NODE_LEVEL针对单个物理节点的建议

PLAN_ADVICE 提供针对如下问题的建议

若检测到分组聚合可以启用两阶段优化但未开启时,优化器(optimizer)将会把建议 id 附在 GroupAggregate 节点内作为索引,在最后附上建议内容。

SQL1

  1. SET 'table.exec.mini-batch.enabled' = 'true';
  2. SET 'table.exec.mini-batch.allow-latency' = '5s';
  3. SET 'table.exec.mini-batch.size' = '200';
  4. SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';
  5. CREATE TABLE MyTable (
  6. a BIGINT,
  7. b INT NOT NULL,
  8. c VARCHAR,
  9. d BIGINT
  10. ) WITH (
  11. 'connector' = 'values',
  12. 'bounded' = 'false');
  13. EXPLAIN PLAN_ADVICE
  14. SELECT
  15. AVG(a) AS avg_a,
  16. COUNT(*) AS cnt,
  17. COUNT(b) AS cnt_b,
  18. MIN(b) AS min_b,
  19. MAX(c) FILTER (WHERE a > 1) AS max_c
  20. FROM MyTable;

NODE_LEVEL ADVICE

  1. == Optimized Physical Plan With Advice ==
  2. Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
  3. +- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
  4. +- Exchange(distribution=[single])
  5. +- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
  6. +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
  7. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])
  8. advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').

若检测到存在 NDU 问题风险时,优化器(optimizer)将会把建议内容附在最后。

SQL2

  1. CREATE TABLE MyTable (
  2. a INT,
  3. b BIGINT,
  4. c STRING,
  5. d INT,
  6. `day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
  7. PRIMARY KEY (a, c) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'values',
  10. 'changelog-mode' = 'I,UA,UB,D'
  11. );
  12. CREATE TABLE MySink (
  13. a INT,
  14. b BIGINT,
  15. c STRING,
  16. PRIMARY KEY (a) NOT ENFORCED
  17. ) WITH (
  18. 'connector' = 'values',
  19. 'sink-insert-only' = 'false'
  20. );
  21. EXPLAIN PLAN_ADVICE
  22. INSERT INTO MySink
  23. SELECT a, b, `day`
  24. FROM MyTable
  25. WHERE b > 100;

QUERY_LEVEL WARNING

  1. == Optimized Physical Plan With Advice ==
  2. Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
  3. +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
  4. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])
  5. advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
  6. related rel plan:
  7. Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
  8. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])

若未检测到问题,优化器(optimizer)将会在计划最后附上 “No available advice” 作为提示。

SQL3

  1. CREATE TABLE MyTable (
  2. a INT,
  3. b BIGINT,
  4. c STRING,
  5. d INT
  6. ) WITH (
  7. 'connector' = 'values',
  8. 'changelog-mode' = 'I'
  9. );
  10. EXPLAIN PLAN_ADVICE
  11. SELECT * FROM MyTable WHERE b > 100;

NO_AVAILABLE ADVICE

  1. == Optimized Physical Plan With Advice ==
  2. Calc(select=[a, b, c, d], where=[>(b, 100)])
  3. +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])
  4. No available advice...

JSON_EXECUTION_PLAN

生成 json 格式的程序执行计划。

语法

  1. EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
  2. statement_set:
  3. EXECUTE STATEMENT SET
  4. BEGIN
  5. insert_statement;
  6. ...
  7. insert_statement;
  8. END;

关于 query 的语法,请查阅 Queries 页面。 关于 INSERT 的语法,请查阅 INSERT 页面。