EXPLAIN Statements

EXPLAIN statements are used to explain the logical and optimized query plans of a query or an INSERT statement.

Run an EXPLAIN statement

Java

EXPLAIN statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() method returns explain result for a successful EXPLAIN operation, otherwise will throw an exception.

The following examples show how to run an EXPLAIN statement in TableEnvironment.

Scala

EXPLAIN statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() method returns explain result for a successful EXPLAIN operation, otherwise will throw an exception.

The following examples show how to run an EXPLAIN statement in TableEnvironment.

Python

EXPLAIN statements can be executed with the execute_sql() method of the TableEnvironment. The execute_sql() method returns explain result for a successful EXPLAIN operation, otherwise will throw an exception.

The following examples show how to run an EXPLAIN statement in TableEnvironment.

SQL CLI

EXPLAIN statements can be executed in SQL CLI.

The following examples show how to run an EXPLAIN statement in SQL CLI.

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  3. // register a table named "Orders"
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  6. // explain SELECT statement through TableEnvironment.explainSql()
  7. String explanation =
  8. tEnv.explainSql(
  9. "SELECT `count`, COUNT(word) FROM ("
  10. + "MyTable1 WHERE word LIKE 'F%' "
  11. + "UNION ALL "
  12. + "SELECT `count`, word FROM MyTable2) tmp"
  13. + "GROUP BY `count`");
  14. System.out.println(explanation);
  15. // explain SELECT statement through TableEnvironment.executeSql()
  16. TableResult tableResult =
  17. tEnv.executeSql(
  18. "EXPLAIN PLAN FOR "
  19. + "SELECT `count`, COUNT(word) FROM ("
  20. + "MyTable1 WHERE word LIKE 'F%' "
  21. + "UNION ALL "
  22. + "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
  23. tableResult.print();
  24. TableResult tableResult2 =
  25. tEnv.executeSql(
  26. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
  27. + "SELECT `count`, COUNT(word) FROM ("
  28. + "MyTable1 WHERE word LIKE 'F%' "
  29. + "UNION ALL "
  30. + "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`");
  31. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tEnv = StreamTableEnvironment.create(env)
  3. // register a table named "Orders"
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  6. // explain SELECT statement through TableEnvironment.explainSql()
  7. val explanation = tEnv.explainSql(
  8. """
  9. |SELECT `count`, COUNT(word)
  10. |FROM (
  11. | SELECT `count`, word FROM MyTable1
  12. | WHERE word LIKE 'F%'
  13. | UNION ALL
  14. | SELECT `count`, word FROM MyTable2 ) tmp
  15. |GROUP BY `count`
  16. |""".stripMargin)
  17. println(explanation)
  18. // explain SELECT statement through TableEnvironment.executeSql()
  19. val tableResult = tEnv.executeSql(
  20. """
  21. |EXPLAIN PLAN FOR
  22. |SELECT `count`, COUNT(word)
  23. |FROM (
  24. | SELECT `count`, word FROM MyTable1
  25. | WHERE word LIKE 'F%'
  26. | UNION ALL
  27. | SELECT `count`, word FROM MyTable2 ) tmp
  28. |GROUP BY `count`
  29. |""".stripMargin)
  30. tableResult.print()
  31. val tableResult2 = tEnv.executeSql(
  32. """
  33. |EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN
  34. |SELECT `count`, COUNT(word)
  35. |FROM (
  36. | SELECT `count`, word FROM MyTable1
  37. | WHERE word LIKE 'F%'
  38. | UNION ALL
  39. | SELECT `count`, word FROM MyTable2 ) tmp
  40. |GROUP BY `count`
  41. |""".stripMargin)
  42. tableResult2.print()

Python

  1. table_env = StreamTableEnvironment.create(...)
  2. t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  3. t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  4. # explain SELECT statement through TableEnvironment.explain_sql()
  5. explanation1 = t_env.explain_sql(
  6. "SELECT `count`, COUNT(word) FROM ("
  7. "MyTable1 WHERE word LIKE 'F%' "
  8. "UNION ALL "
  9. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  10. print(explanation1)
  11. # explain SELECT statement through TableEnvironment.execute_sql()
  12. table_result = t_env.execute_sql(
  13. "EXPLAIN PLAN FOR "
  14. "SELECT `count`, COUNT(word) FROM ("
  15. "MyTable1 WHERE word LIKE 'F%' "
  16. "UNION ALL "
  17. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  18. table_result.print()
  19. table_result2 = t_env.execute_sql(
  20. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN "
  21. "SELECT `count`, COUNT(word) FROM ("
  22. "MyTable1 WHERE word LIKE 'F%' "
  23. "UNION ALL "
  24. "SELECT `count`, word FROM MyTable2) tmp GROUP BY `count`")
  25. table_result2.print()

SQL CLI

  1. Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  4. [INFO] Table has been created.
  5. Flink SQL> EXPLAIN PLAN FOR SELECT `count`, COUNT(word) FROM
  6. > ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
  7. > UNION ALL
  8. > SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;
  9. Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, PLAN_ADVICE, JSON_EXECUTION_PLAN SELECT `count`, COUNT(word) FROM
  10. > ( SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
  11. > UNION ALL
  12. > SELECT `count`, word FROM MyTable2 ) tmp GROUP BY `count`;

The EXPLAIN result is:

EXPLAIN PLAN

  1. == Abstract Syntax Tree ==
  2. LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
  3. +- LogicalUnion(all=[true])
  4. :- LogicalProject(count=[$0], word=[$1])
  5. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  6. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  7. +- LogicalProject(count=[$0], word=[$1])
  8. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  9. == Optimized Physical Plan ==
  10. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  11. +- Exchange(distribution=[hash[count]])
  12. +- Union(all=[true], union=[count, word])
  13. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
  14. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  15. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  16. == Optimized Execution Plan ==
  17. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  18. +- Exchange(distribution=[hash[count]])
  19. +- Union(all=[true], union=[count, word])
  20. :- Calc(select=[count, word], where=[LIKE(word, 'F%')])
  21. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  22. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

EXPLAIN PLAN WITH DETAILS

  1. == Abstract Syntax Tree ==
  2. LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
  3. +- LogicalUnion(all=[true])
  4. :- LogicalProject(count=[$0], word=[$1])
  5. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  6. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  7. +- LogicalProject(count=[$0], word=[$1])
  8. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  9. == Optimized Physical Plan With Advice ==
  10. GroupAggregate(advice=[1], groupBy=[count], select=[count, COUNT(word) AS EXPR$1], changelogMode=[I,UA]): rowcount = 1.05E8, cumulative cost = {5.2E8 rows, 1.805E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
  11. +- Exchange(distribution=[hash[count]], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {4.15E8 rows, 1.7945E10 cpu, 4.0E9 io, 2.1E9 network, 0.0 memory}
  12. +- Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
  13. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  14. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  15. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  16. advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value).
  17. == Optimized Execution Plan ==
  18. GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])
  19. +- Exchange(distribution=[hash[count]])
  20. +- Union(all=[true], union=[count, word])
  21. :- Calc(select=[count, word], where=[LIKE(word, 'F%')])
  22. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  23. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  24. == Physical Execution Plan ==
  25. {
  26. "nodes" : [ {
  27. "id" : 17,
  28. "type" : "Source: MyTable1[15]",
  29. "pact" : "Data Source",
  30. "contents" : "[15]:TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
  31. "parallelism" : 1
  32. }, {
  33. "id" : 18,
  34. "type" : "Calc[16]",
  35. "pact" : "Operator",
  36. "contents" : "[16]:Calc(select=[count, word], where=[LIKE(word, 'F%')])",
  37. "parallelism" : 1,
  38. "predecessors" : [ {
  39. "id" : 17,
  40. "ship_strategy" : "FORWARD",
  41. "side" : "second"
  42. } ]
  43. }, {
  44. "id" : 19,
  45. "type" : "Source: MyTable2[17]",
  46. "pact" : "Data Source",
  47. "contents" : "[17]:TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
  48. "parallelism" : 1
  49. }, {
  50. "id" : 22,
  51. "type" : "GroupAggregate[20]",
  52. "pact" : "Operator",
  53. "contents" : "[20]:GroupAggregate(groupBy=[count], select=[count, COUNT(word) AS EXPR$1])",
  54. "parallelism" : 1,
  55. "predecessors" : [ {
  56. "id" : 18,
  57. "ship_strategy" : "HASH",
  58. "side" : "second"
  59. }, {
  60. "id" : 19,
  61. "ship_strategy" : "HASH",
  62. "side" : "second"
  63. } ]
  64. } ]
  65. }

ExplainDetails

ExplainDetail defines the types of details for explain result.

ESTIMATED_COST

Specify ESTIMATED_COST will inform the optimizer to attach the estimated optimal cost of each physical rel node to the output.

  1. == Optimized Physical Plan ==
  2. TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})

CHANGELOG_MODE

Specify CHANGELOG_MODE will inform the optimizer to attach the changelog mode (see more details at Dynamic Tables) of each physical rel node to the ouptput.

  1. == Optimized Physical Plan ==
  2. GroupAggregate(..., changelogMode=[I,UA,D])

PLAN_ADVICE

PLAN_ADVICE is supported since Flink version 1.17.

Specify PLAN_ADVICE will inform the optimizer to analyze the optimized physical plan to provide the potential risk warnings and/or optimization advice. Meanwhile, it will change the title of “Optimized Physical Plan” to “Optimized Physical Plan with Advice” as the highlight.

Plan advice is categorized by Kind and Scope.

Advice KindDescription
WARNINGIt reveals potential data correctness risks
ADVICEIt suggests potential SQL optimizer tuning configuration
Advice ScopeDescription
QUERY_LEVELIt provides advice from a global view, targeting the entire query
NODE_LEVELIt provides advice to a specific rel node

Flink SQL provides the plan advice targeting the following issues

If GroupAggregate is detected and can be optimized to the local-global aggregation, the optimizer will tag advice id to the GroupAggregate rel node, and suggests users to update configurations.

SQL1

  1. SET 'table.exec.mini-batch.enabled' = 'true';
  2. SET 'table.exec.mini-batch.allow-latency' = '5s';
  3. SET 'table.exec.mini-batch.size' = '200';
  4. SET 'table.optimizer.agg-phase-strategy' = 'ONE_PHASE';
  5. CREATE TABLE MyTable (
  6. a BIGINT,
  7. b INT NOT NULL,
  8. c VARCHAR,
  9. d BIGINT
  10. ) WITH (
  11. 'connector' = 'values',
  12. 'bounded' = 'false');
  13. EXPLAIN PLAN_ADVICE
  14. SELECT
  15. AVG(a) AS avg_a,
  16. COUNT(*) AS cnt,
  17. COUNT(b) AS cnt_b,
  18. MIN(b) AS min_b,
  19. MAX(c) FILTER (WHERE a > 1) AS max_c
  20. FROM MyTable;

NODE_LEVEL ADVICE

  1. == Optimized Physical Plan With Advice ==
  2. Calc(select=[avg_a, cnt, cnt AS cnt_b, min_b, max_c])
  3. +- GroupAggregate(advice=[1], select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])
  4. +- Exchange(distribution=[single])
  5. +- Calc(select=[a, b, c, IS TRUE(>(a, 1)) AS $f3])
  6. +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
  7. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])
  8. advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.optimizer.agg-phase-strategy' to 'AUTO').

If NDU issue is detected, the optimizer will append the warning at the end of the physical plan.

SQL2

  1. CREATE TABLE MyTable (
  2. a INT,
  3. b BIGINT,
  4. c STRING,
  5. d INT,
  6. `day` AS DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),
  7. PRIMARY KEY (a, c) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'values',
  10. 'changelog-mode' = 'I,UA,UB,D'
  11. );
  12. CREATE TABLE MySink (
  13. a INT,
  14. b BIGINT,
  15. c STRING,
  16. PRIMARY KEY (a) NOT ENFORCED
  17. ) WITH (
  18. 'connector' = 'values',
  19. 'sink-insert-only' = 'false'
  20. );
  21. EXPLAIN PLAN_ADVICE
  22. INSERT INTO MySink
  23. SELECT a, b, `day`
  24. FROM MyTable
  25. WHERE b > 100;

QUERY_LEVEL WARNING

  1. == Optimized Physical Plan With Advice ==
  2. Sink(table=[default_catalog.default_database.MySink], fields=[a, b, day], upsertMaterialize=[true])
  3. +- Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day], where=[>(b, 100)])
  4. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])
  5. advice[1]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
  6. related rel plan:
  7. Calc(select=[a, b, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], where=[>(b, 100)], changelogMode=[I,UB,UA,D])
  8. +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b], changelogMode=[I,UB,UA,D])

If no warning or advice is detected, the optimizer will append a notice that “No available advice” at the end of the physical plan.

SQL3

  1. CREATE TABLE MyTable (
  2. a INT,
  3. b BIGINT,
  4. c STRING,
  5. d INT
  6. ) WITH (
  7. 'connector' = 'values',
  8. 'changelog-mode' = 'I'
  9. );
  10. EXPLAIN PLAN_ADVICE
  11. SELECT * FROM MyTable WHERE b > 100;

NO_AVAILABLE ADVICE

  1. == Optimized Physical Plan With Advice ==
  2. Calc(select=[a, b, c, d], where=[>(b, 100)])
  3. +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])
  4. No available advice...

JSON_EXECUTION_PLAN

Specify JSON_EXECUTION_PLAN will inform the optimizer to attach the json-format execution plan of the program to the output.

Syntax

  1. EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
  2. statement_set:
  3. STATEMENT SET
  4. BEGIN
  5. insert_statement;
  6. ...
  7. insert_statement;
  8. END;

For query syntax, please refer to Queries page. For insert syntax, please refer to INSERT page.