介绍

SQL Gateway 服务支持并发执行从多个client提交的 SQL。它提供了一种简单的方法来提交 Flink 作业、查找元数据和在线分析数据。

SQL Gateway 由插件化的 endpoint 和 SqlGatewayService 组成。多个 endpoint 可以复用 SqlGatewayService 处理请求。endpoint 是用户连接的入口。 用户可以使用不同的工具连接不同类型的 endpoint。

SQL Gateway Architecture

开始

这个章节描述如何通过命令行启动和执行你的第一个 Flink SQL 作业。 SQL Gateway 和 Flink 版本一起发布,开箱即用。它只需要一个正在运行的 Flink 集群,可以执行 Flink SQL 作业。 更多启动 Flink 集群的信息可以查看 Cluster & Deployment。 如果你只是想简单尝试 SQL Client,你也可以使用以下命令启动只有一个 worker 的本地集群。

  1. $ ./bin/start-cluster.sh

Starting the SQL Gateway

SQL Gateway 脚本也在 Flink 二进制包的目录中。用户通过以下命令启动:

  1. $ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

这个命令启动 SQL Gateway 和 REST Endpoint,监听 localhost:8083 地址。你可以使用 curl 命令检查 REST Endpoint 是否存活。

  1. $ curl http://localhost:8083/v1/info
  2. {"productName":"Apache Flink","version":"1.19.0"}

执行 SQL 查询

你可以通过以下步骤来验证集群配置和连接。

Step 1: Open a session

  1. $ curl --request POST http://localhost:8083/v1/sessions
  2. {"sessionHandle":"..."}

SQL Gateway 返回结果中的 sessionHandle 用来唯一标识每个活跃用户。

Step 2: Execute a query

  1. $ curl --request POST http://localhost:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'
  2. {"operationHandle":"..."}

SQL Gateway 返回结果中的 operationHandle 用来唯一标识提交的 SQL。

Step 3: Fetch results

通过上述 sessionHandleoperationHandle,你能获取相应的结果。

  1. $ curl --request GET http://localhost:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0
  2. {
  3. "results": {
  4. "columns": [
  5. {
  6. "name": "EXPR$0",
  7. "logicalType": {
  8. "type": "INTEGER",
  9. "nullable": false
  10. }
  11. }
  12. ],
  13. "data": [
  14. {
  15. "kind": "INSERT",
  16. "fields": [
  17. 1
  18. ]
  19. }
  20. ]
  21. },
  22. "resultType": "PAYLOAD",
  23. "nextResultUri": "..."
  24. }

结果中的 nextResultUri 不是null时,用于获取下一批结果。

  1. $ curl --request GET ${nextResultUri}

配置

SQL Gateway 启动参数

目前 SQL Gateway 有以下可选命令,它们将在下文详细讨论。

  1. $ ./bin/sql-gateway.sh --help
  2. Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
  3. commands:
  4. start - Run a SQL Gateway as a daemon
  5. start-foreground - Run a SQL Gateway as a console application
  6. stop - Stop the SQL Gateway daemon
  7. stop-all - Stop all the SQL Gateway daemons
  8. -h | --help - Show this help message

“start” 或者 “start-foreground” 命令可以使你在 CLI 中配置 SQL Gateway。

  1. $ ./bin/sql-gateway.sh start --help
  2. Start the Flink SQL Gateway as a daemon to submit Flink SQL.
  3. Syntax: start [OPTIONS]
  4. -D <property=value> Use value for given property
  5. -h,--help Show the help message with descriptions of all
  6. options.

SQL Gateway 配置

你可以通过以下方式在启动时配置 SQL Gateway,或者任意合法的 Flink configuration 配置:

  1. $ ./sql-gateway -Dkey=value
KeyDefaultTypeDescription
sql-gateway.session.check-interval
1 minDuration定时检查空闲 session 是否超时的间隔时间,设置为 0 时关闭检查。
sql-gateway.session.idle-timeout
10 minDurationsession 超时时间,在这个时间区间内没有被访问过的 session 会被关闭。如果设置为 0,session 将不会被关闭。
sql-gateway.session.max-num
1000000IntegerSQL Gateway 服务中存活 session 的最大数量。
sql-gateway.session.plan-cache.enabled
falseBoolean设置为 true 的时候,SQL Gateway 会在一个 session 内部缓存并复用 plan。
sql-gateway.session.plan-cache.size
100IntegerPlan cache 的大小, 当且仅当 table.optimizer.plan-cache.enabled 为 true 的时候生效。
sql-gateway.session.plan-cache.ttl
1 hourDurationPlan cache 的 TTL, 控制 cache 在写入之后多久过期, 当且仅当 table.optimizer.plan-cache.enabled 为 true 的时候生效。
sql-gateway.worker.keepalive-time
5 minDuration空闲工作线程的存活时间。当工作线程数量超过了配置的最小值,超过存活时间的多余空闲工作线程会被杀掉。
sql-gateway.worker.threads.max
500IntegerSQL Gateway 服务中工作线程的最大数量。
sql-gateway.worker.threads.min
5IntegerSQL Gateway 服务中工作线程的最小数量。

已支持的 Endpoints

Flink 原生支持 REST EndpointHiveServer2 Endpoint。 SQL Gateway 默认集成 REST Endpoint。由于架构的可扩展性,用户可以通过指定 endpoint 来启动 SQL Gateway。

  1. $ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2

或者在 Flink 配置文件 中增加如下配置:

  1. sql-gateway.endpoint.type: hiveserver2

Notice: 如果 CLI 命令和 Flink 配置文件 都有 sql-gateway.endpoint.type,CLI 的优先级比 Flink 配置文件 更高。

具体的 endpoint 请参考相应页面。