查询

TableEnvironmentsqlQuery() 方法可以执行 SELECTVALUES 语句。 这个方法把 SELECT 语句(或 VALUES 语句)的结果作为一个 Table 返回。 Table可以用在后续 SQL 和 Table API 查询中,可以转换为 DataStream, 或者 写入到TableSink。 SQL 和 Table API 查询可以无缝混合,并进行整体优化并转换为单个程序。

为了在SQL查询中访问表,它必须注册在 TableEnvironment。 表使用下列方式注册:TableSourceTableCREATE TABLE 语句DataStream。 也可以通过在 TableEnvironment 中注册 Catalog 来指定数据源的位置。

为了方便起见,Table.toString() 自动在 TableEnvironment 中注册一个名称唯一的表,并返回表名。 所以Table对象可以直接内嵌入 SQL 中查询使用,如下示例所示。

注意: 查询如果包含不支持的 SQL 特性,会抛出TableException异常。 下面的章节中列出了批处理和流处理上支持的 SQL 特性。

指定查询

下面的示例演示如何在一个注册的表和内联(inlined)的表上指定SQL查询。

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // SQL query with an inlined (unregistered) table
  6. Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
  9. // SQL query with a registered table
  10. // register the DataStream as view "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. Table result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  15. // create and register a TableSink
  16. final Schema schema = Schema.newBuilder()
  17. .column("product", DataTypes.STRING())
  18. .column("amount", DataTypes.INT())
  19. .build();
  20. final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem")
  21. .schema(schema)
  22. .option("path", "/path/to/file")
  23. .format(FormatDescriptor.forFormat("csv")
  24. .option("field-delimiter", ",")
  25. .build())
  26. .build();
  27. tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor);
  28. // run an INSERT SQL on the Table and emit the result to the TableSink
  29. tableEnv.executeSql(
  30. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = StreamTableEnvironment.create(env)
  3. // read a DataStream from an external source
  4. val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
  5. // SQL query with an inlined (unregistered) table
  6. val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
  7. val result = tableEnv.sqlQuery(
  8. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
  9. // SQL query with a registered table
  10. // register the DataStream under the name "Orders"
  11. tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. val result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  15. // create and register a TableSink
  16. val schema = Schema.newBuilder()
  17. .column("product", DataTypes.STRING())
  18. .column("amount", DataTypes.INT())
  19. .build()
  20. val sinkDescriptor = TableDescriptor.forConnector("filesystem")
  21. .schema(schema)
  22. .format(FormatDescriptor.forFormat("csv")
  23. .option("field-delimiter", ",")
  24. .build())
  25. .build()
  26. tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor)
  27. // run an INSERT SQL on the Table and emit the result to the TableSink
  28. tableEnv.executeSql(
  29. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env)
  3. # SQL query with an inlined (unregistered) table
  4. # elements data type: BIGINT, STRING, BIGINT
  5. table = table_env.from_elements(..., ['user', 'product', 'amount'])
  6. result = table_env \
  7. .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
  8. # create and register a TableSink
  9. schema = Schema.new_builder()
  10. .column("product", DataTypes.STRING())
  11. .column("amount", DataTypes.INT())
  12. .build()
  13. sink_descriptor = TableDescriptor.for_connector("filesystem")
  14. .schema(schema)
  15. .format(FormatDescriptor.for_format("csv")
  16. .option("field-delimiter", ",")
  17. .build())
  18. .build()
  19. t_env.create_temporary_table("RubberOrders", sink_descriptor)
  20. # run an INSERT SQL on the Table and emit the result to the TableSink
  21. table_env \
  22. .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

执行查询

通过 TableEnvironment.executeSql() 方法可以执行 SELECTVALUES 语句,并把结果收集到本地。它将SELECT语句(或VALUES语句)的结果作为 TableResult 返回。和 SELECT 语句相似,Table.execute() 方法可以执行Table对象,并把结果收集到本地客户端。 TableResult.collect() 方法返回一个可关闭的行迭代器(row iterator)。除非所有结果数据都被收集完成了,否则SELECT作业不会停止,所以应该主动使用 CloseableIterator#close() 方法关闭作业,以防止资源泄露。TableResult.print() 可以打印 SELECT 的结果到客户端的控制台中。 TableResult 上的结果数据只能被访问一次。因此 collect()print() 只能二选一。

TableResult.collect()TableResult.print()在不同的 checkpointing 设置下有一些差异。(流式作业开启 checkpointing,参见 checkpointing 设置)。

  • 对于没有开启 checkpoint 的批作业或流作业,TableResult.collect()TableResult.print() 既不保证精确一次(exactly-once)也不保证至少一次(at-least-once)。查询结果一旦产生,客户端可以立即访问,但是,作业失败或重启将抛出异常。
  • 对于 checkpoint 设置为精确一次(exactly-once)的流式作业, TableResult.collect()TableResult.print() 保证端到端的数据只传递一次。相应的checkpoint完成后,客户端才能访问结果。
  • 对于 checkpoint 设置为至少一次(at-least-once)的流式作业, TableResult.collect()TableResult.print() 保证端到端的数据至少传递一次,查询结果一旦产生,客户端可以立即访问,但是可能会有同一条数据出现多次的情况。

    Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
  4. // execute SELECT statement
  5. TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
  6. // use try-with-resources statement to make sure the iterator will be closed automatically
  7. try (CloseableIterator<Row> it = tableResult1.collect()) {
  8. while(it.hasNext()) {
  9. Row row = it.next();
  10. // handle row
  11. }
  12. }
  13. // execute Table
  14. TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
  15. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tableEnv = StreamTableEnvironment.create(env, settings)
  3. // enable checkpointing
  4. tableEnv.getConfig
  5. .set(CheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
  6. tableEnv.getConfig
  7. .set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
  8. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  9. // execute SELECT statement
  10. val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
  11. val it = tableResult1.collect()
  12. try while (it.hasNext) {
  13. val row = it.next
  14. // handle row
  15. }
  16. finally it.close() // close the iterator to avoid resource leak
  17. // execute Table
  18. val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
  19. tableResult2.print()

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. table_env = StreamTableEnvironment.create(env, settings)
  3. # enable checkpointing
  4. table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
  5. table_env.get_config().set("execution.checkpointing.interval", "10s")
  6. table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  7. # execute SELECT statement
  8. table_result1 = table_env.execute_sql("SELECT * FROM Orders")
  9. table_result1.print()
  10. # execute Table
  11. table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
  12. table_result2.print()

语法

Flink使用支持标准 ANSI SQL 的 Apache Calcite 解析 SQL。

下面的 BNF-grammar 描述了批处理和流处理查询中所支持 SQL 特性的超集。操作展示了支持的功能以及示例,并指示了哪些功能仅支持批处理或流处理查询。

Grammar

  1. query:
  2. values
  3. | WITH withItem [ , withItem ]* query
  4. | {
  5. select
  6. | selectWithoutFrom
  7. | query UNION [ ALL ] query
  8. | query EXCEPT query
  9. | query INTERSECT query
  10. }
  11. [ ORDER BY orderItem [, orderItem ]* ]
  12. [ LIMIT { count | ALL } ]
  13. [ OFFSET start { ROW | ROWS } ]
  14. [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
  15. withItem:
  16. name
  17. [ '(' column [, column ]* ')' ]
  18. AS '(' query ')'
  19. orderItem:
  20. expression [ ASC | DESC ]
  21. select:
  22. SELECT [ ALL | DISTINCT ]
  23. { * | projectItem [, projectItem ]* }
  24. FROM tableExpression
  25. [ WHERE booleanExpression ]
  26. [ GROUP BY { groupItem [, groupItem ]* } ]
  27. [ HAVING booleanExpression ]
  28. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  29. selectWithoutFrom:
  30. SELECT [ ALL | DISTINCT ]
  31. { * | projectItem [, projectItem ]* }
  32. projectItem:
  33. expression [ [ AS ] columnAlias ]
  34. | tableAlias . *
  35. tableExpression:
  36. tableReference [, tableReference ]*
  37. | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
  38. joinCondition:
  39. ON booleanExpression
  40. | USING '(' column [, column ]* ')'
  41. tableReference:
  42. tablePrimary
  43. [ matchRecognize ]
  44. [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
  45. tablePrimary:
  46. [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  47. | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  48. | [ LATERAL ] '(' query ')'
  49. | UNNEST '(' expression ')'
  50. tablePath:
  51. [ [ catalogName . ] databaseName . ] tableName
  52. systemTimePeriod:
  53. FOR SYSTEM_TIME AS OF dateTimeExpression
  54. dynamicTableOptions:
  55. /*+ OPTIONS(key=val [, key=val]*) */
  56. key:
  57. stringLiteral
  58. val:
  59. stringLiteral
  60. values:
  61. VALUES expression [, expression ]*
  62. groupItem:
  63. expression
  64. | '(' ')'
  65. | '(' expression [, expression ]* ')'
  66. | CUBE '(' expression [, expression ]* ')'
  67. | ROLLUP '(' expression [, expression ]* ')'
  68. | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  69. windowRef:
  70. windowName
  71. | windowSpec
  72. windowSpec:
  73. [ windowName ]
  74. '('
  75. [ ORDER BY orderItem [, orderItem ]* ]
  76. [ PARTITION BY expression [, expression ]* ]
  77. [
  78. RANGE numericOrIntervalExpression {PRECEDING}
  79. | ROWS numericExpression {PRECEDING}
  80. ]
  81. ')'
  82. matchRecognize:
  83. MATCH_RECOGNIZE '('
  84. [ PARTITION BY expression [, expression ]* ]
  85. [ ORDER BY orderItem [, orderItem ]* ]
  86. [ MEASURES measureColumn [, measureColumn ]* ]
  87. [ ONE ROW PER MATCH ]
  88. [ AFTER MATCH
  89. ( SKIP TO NEXT ROW
  90. | SKIP PAST LAST ROW
  91. | SKIP TO FIRST variable
  92. | SKIP TO LAST variable
  93. | SKIP TO variable )
  94. ]
  95. PATTERN '(' pattern ')'
  96. [ WITHIN intervalLiteral ]
  97. DEFINE variable AS condition [, variable AS condition ]*
  98. ')'
  99. measureColumn:
  100. expression AS alias
  101. pattern:
  102. patternTerm [ '|' patternTerm ]*
  103. patternTerm:
  104. patternFactor [ patternFactor ]*
  105. patternFactor:
  106. variable [ patternQuantifier ]
  107. patternQuantifier:
  108. '*'
  109. | '*?'
  110. | '+'
  111. | '+?'
  112. | '?'
  113. | '??'
  114. | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  115. | '{' repeat '}'

Flink SQL使用的标识符词法规则(table,attribute,function names)和Java相似。

  • 大写或小写的标识符都是保留的,就算没有被引用。
  • 标识符的匹配区分大小写。
  • 和Java不同,反引号(\)允许标识符包含非字母数字(no-alphanumeric)字符(例如:“SELECT a AS `my field` FROM t”)。

字符串必须被单引号括起来(例如: SELECT 'Hello World')。两个单引号用于转义(例如:SELECT 'It''s me')。

  1. Flink SQL> SELECT 'Hello World', 'It''s me';
  2. +-------------+---------+
  3. | EXPR$0 | EXPR$1 |
  4. +-------------+---------+
  5. | Hello World | It's me |
  6. +-------------+---------+
  7. 1 row in set

字符串支持Unicode字符。 下面是显式使用Unicode编码的语法:

  • 使用反斜杠(\)作为转义字符 (默认):SELECT U&'\263A'
  • 使用自定义的转义字符:SELECT U&'#263A' UESCAPE '#'

操作