Introduction

The SQL Gateway is a service that enables multiple clients from the remote to execute SQL in concurrency. It provides an easy way to submit the Flink Job, look up the metadata, and analyze the data online.

The SQL Gateway is composed of pluggable endpoints and the SqlGatewayService. The SqlGatewayService is a processor that is reused by the endpoints to handle the requests. The endpoint is an entry point that allows users to connect. Depending on the type of the endpoints, users can use different utils to connect.

SQL Gateway Architecture

Getting Started

This section describes how to setup and run your first Flink SQL program from the command-line.

The SQL Gateway is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the Cluster & Deployment part. If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command:

  1. $ ./bin/start-cluster.sh

Starting the SQL Gateway

The SQL Gateway scripts are also located in the binary directory of Flink. Users can start by calling:

  1. $ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

The command starts the SQL Gateway with REST Endpoint that listens on the address localhost:8083. You can use the curl command to check whether the REST Endpoint is available.

  1. $ curl http://localhost:8083/v1/info
  2. {"productName":"Apache Flink","version":"1.19.0"}

Running SQL Queries

For validating your setup and cluster connection, you can work with following steps.

Step 1: Open a session

  1. $ curl --request POST http://localhost:8083/v1/sessions
  2. {"sessionHandle":"..."}

The sessionHandle in the return results is used by the SQL Gateway to uniquely identify every active user.

Step 2: Execute a query

  1. $ curl --request POST http://localhost:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'
  2. {"operationHandle":"..."}

The operationHandle in the return results is used by the SQL Gateway to uniquely identify the submitted SQL.

Step 3: Fetch results

With the sessionHandle and operationHandle above, you can fetch the corresponding results.

  1. $ curl --request GET http://localhost:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0
  2. {
  3. "results": {
  4. "columns": [
  5. {
  6. "name": "EXPR$0",
  7. "logicalType": {
  8. "type": "INTEGER",
  9. "nullable": false
  10. }
  11. }
  12. ],
  13. "data": [
  14. {
  15. "kind": "INSERT",
  16. "fields": [
  17. 1
  18. ]
  19. }
  20. ]
  21. },
  22. "resultType": "PAYLOAD",
  23. "nextResultUri": "..."
  24. }

The nextResultUri in the results is used to fetch the next batch results if it is not null.

  1. $ curl --request GET ${nextResultUri}

Configuration

SQL Gateway startup options

Currently, the SQL Gateway script has the following optional commands. They are discussed in details in the subsequent paragraphs.

  1. $ ./bin/sql-gateway.sh --help
  2. Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
  3. commands:
  4. start - Run a SQL Gateway as a daemon
  5. start-foreground - Run a SQL Gateway as a console application
  6. stop - Stop the SQL Gateway daemon
  7. stop-all - Stop all the SQL Gateway daemons
  8. -h | --help - Show this help message

For “start” or “start-foreground” command, you are able to configure the SQL Gateway in the CLI.

  1. $ ./bin/sql-gateway.sh start --help
  2. Start the Flink SQL Gateway as a daemon to submit Flink SQL.
  3. Syntax: start [OPTIONS]
  4. -D <property=value> Use value for given property
  5. -h,--help Show the help message with descriptions of all
  6. options.

SQL Gateway Configuration

You can configure the SQL Gateway when starting the SQL Gateway below, or any valid Flink configuration entry:

  1. $ ./sql-gateway -Dkey=value
KeyDefaultTypeDescription
sql-gateway.session.check-interval
1 minDurationThe check interval for idle session timeout, which can be disabled by setting to zero.
sql-gateway.session.idle-timeout
10 minDurationTimeout interval for closing the session when the session hasn’t been accessed during the interval. If setting to zero, the session will not be closed.
sql-gateway.session.max-num
1000000IntegerThe maximum number of the active session for sql gateway service.
sql-gateway.session.plan-cache.enabled
falseBooleanWhen it is true, sql gateway will cache and reuse plans for queries per session.
sql-gateway.session.plan-cache.size
100IntegerPlan cache size, it takes effect iff table.optimizer.plan-cache.enabled is true.
sql-gateway.session.plan-cache.ttl
1 hourDurationTTL for plan cache, it controls how long will the cache expire after write, it takes effect iff table.optimizer.plan-cache.enabled is true.
sql-gateway.worker.keepalive-time
5 minDurationKeepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
sql-gateway.worker.threads.max
500IntegerThe maximum number of worker threads for sql gateway service.
sql-gateway.worker.threads.min
5IntegerThe minimum number of worker threads for sql gateway service.

Supported Endpoints

Flink natively supports REST Endpoint and HiveServer2 Endpoint. The SQL Gateway is bundled with the REST Endpoint by default. With the flexible architecture, users are able to start the SQL Gateway with the specified endpoints by calling

  1. $ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2

or add the following config in the Flink configuration file:

  1. sql-gateway.endpoint.type: hiveserver2

Notice: The CLI command has higher priority if Flink configuration file also contains the option sql-gateway.endpoint.type.

For the specific endpoint, please refer to the corresponding page.