SQL Client

Flink’s Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.

The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

Animated demo of the Flink SQL Client CLI running table programs on a cluster

Getting Started

This section describes how to setup and run your first Flink SQL program from the command-line.

The SQL Client is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the Cluster & Deployment part. If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command:

  1. ./bin/start-cluster.sh

Starting the SQL Client CLI

The SQL Client scripts are also located in the binary directory of Flink. Users have two options for starting the SQL Client CLI, either by starting an embedded standalone process or by connecting to a remote SQL Gateway. SQL Client default mode is embedded.

You can start the CLI in embedded mode by calling:

  1. ./bin/sql-client.sh

or explicitly use embedded mode:

  1. ./bin/sql-client.sh embedded

For gateway mode, you can start the CLI by calling:

  1. ./bin/sql-client.sh gateway --endpoint <gateway address>

In the gateway mode, the CLI submits the SQL to the specified remote gateway to execute statements.

The <gateway address> can be provided in two formats: as a host:port combination or as a full URL.

If you need to pass custom HTTP headers, you can do so by setting the FLINK_REST_CLIENT_HEADERS environment variable. For example:

  1. export FLINK_REST_CLIENT_HEADERS="Cookie:myauthcookie=foobar;othercookie=baz"
  2. ./bin/sql-client.sh gateway --endpoint https://your-sql-gateway.endpoint.com/authenticated/sql

For multiple headers, separate them with a newline:

  1. export FLINK_REST_CLIENT_HEADERS=$(cat << EOF
  2. Cookie:myauthcookie=foobar
  3. Cache-Control: no-cache
  4. EOF)

By default, the SQL Client will use the truststore configured using the security.ssl.rest.truststore and security.ssl.rest.truststore-password properties in the Flink configuration file on the SQL client side. If these properties aren’t explicitly configured, the client will use the default certificate stores provided by the JDK.

Note SQL Client only supports connecting to the REST Endpoint since version v2.

See SQL Client startup options below for more details.

Running SQL Queries

For validating your setup and cluster connection, you can enter the simple query below and press Enter to execute it.

  1. SET 'sql-client.execution.result-mode' = 'tableau';
  2. SET 'execution.runtime-mode' = 'batch';
  3. SELECT
  4. name,
  5. COUNT(*) AS cnt
  6. FROM
  7. (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
  8. GROUP BY name;

The SQL Client will retrieve the results from the cluster and visualize them (you can close the result view by pressing the Q key):

  1. +-------+-----+
  2. | name | cnt |
  3. +-------+-----+
  4. | Alice | 1 |
  5. | Bob | 2 |
  6. | Greg | 1 |
  7. +-------+-----+

The SET command allows you to tune the job execution and the sql client behaviour. See SQL Client Configuration below for more details.

After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The configuration section explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.

Key-strokes

There is a list of available key-strokes in SQL Client

Key-Stroke (Linux, Windows(WSL))Key-Stroke (Mac)Description
alt-b, ctrl+⍇Esc-bBackward word
alt-f, Ctrl+⍈Esc-fForward word
alt-cEsc-cCapitalize word
alt-lEsc-lLowercase word
alt-uEsc-uUppercase word
alt-dEsc-dKill word
alt-nEsc-nHistory search forward (behaves same as down line from history in case of empty input)
alt-pEsc-pHistory search backward (behaves same as up line from history in case of empty input)
alt-tEsc-tTranspose words
ctrl-a⌘-aTo the beginning of line
ctrl-e⌘-eTo the end of line
ctrl-b⌘-bBackward char
ctrl-f⌘-fForward char
ctrl-d⌘-dDelete char
ctrl-h⌘-hBackward delete char
ctrl-t⌘-tTranspose chars
ctrl-i⌘-iInvoke completion
ctrl-j⌘-jSubmit a query
ctrl-m⌘-mSubmit a query
ctrl-k⌘-kKill the line to the right from the cursor
ctrl-w⌘-wKill the line to the left from the cursor
ctrl-u⌘-uKill the whole line
ctrl-l⌘-lClear screen
ctrl-n⌘-nDown line from history
ctrl-p⌘-pUp line from history
ctrl-r⌘-rHistory incremental search backward
ctrl-s⌘-sHistory incremental search forward

Getting help

The documentation of the SQL Client commands can be accessed by typing the HELP command.

See also the general SQL documentation.

Configuration

SQL Client startup options

The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.

  1. ./sql-client [MODE] [OPTIONS]
  2. The following options are available:
  3. Mode "embedded" (default) submits Flink jobs from the local machine.
  4. Syntax: [embedded] [OPTIONS]
  5. "embedded" mode options:
  6. -D <session dynamic config key=val> The dynamic config key=val for a
  7. session.
  8. -f,--file <script file> Script file that should be
  9. executed. In this mode, the
  10. client will not open an
  11. interactive terminal.
  12. -h,--help Show the help message with
  13. descriptions of all options.
  14. -hist,--history <History file path> The file which you want to save
  15. the command history into. If not
  16. specified, we will auto-generate
  17. one under your user's home
  18. directory.
  19. -i,--init <initialization file> Script file that used to init
  20. the session context. If get
  21. error in execution, the sql
  22. client will exit. Notice it's
  23. not allowed to add query or
  24. insert into the init file.
  25. -j,--jar <JAR file> A JAR file to be imported into
  26. the session. The file might
  27. contain user-defined classes
  28. needed for the execution of
  29. statements such as functions,
  30. table sources, or sinks. Can be
  31. used multiple times.
  32. -l,--library <JAR directory> A JAR file directory with which
  33. every new session is
  34. initialized. The files might
  35. contain user-defined classes
  36. needed for the execution of
  37. statements such as functions,
  38. table sources, or sinks. Can be
  39. used multiple times.
  40. -pyarch,--pyArchives <arg> Add python archive files for
  41. job. The archive files will be
  42. extracted to the working
  43. directory of python UDF worker.
  44. For each archive file, a target
  45. directory be specified. If the
  46. target directory name is
  47. specified, the archive file will
  48. be extracted to a directory with
  49. the specified name. Otherwise,
  50. the archive file will be
  51. extracted to a directory with
  52. the same name of the archive
  53. file. The files uploaded via
  54. this option are accessible via
  55. relative path. '#' could be used
  56. as the separator of the archive
  57. file path and the target
  58. directory name. Comma (',')
  59. could be used as the separator
  60. to specify multiple archive
  61. files. This option can be used
  62. to upload the virtual
  63. environment, the data files used
  64. in Python UDF (e.g.,
  65. --pyArchives
  66. file:///tmp/py37.zip,file:///tmp
  67. /data.zip#data --pyExecutable
  68. py37.zip/py37/bin/python). The
  69. data files could be accessed in
  70. Python UDF, e.g.: f =
  71. open('data/data.txt', 'r').
  72. -pyclientexec,--pyClientExecutable <arg> The path of the Python
  73. interpreter used to launch the
  74. Python process when submitting
  75. the Python jobs via "flink run"
  76. or compiling the Java/Scala jobs
  77. containing Python UDFs.
  78. -pyexec,--pyExecutable <arg> Specify the path of the python
  79. interpreter used to execute the
  80. python UDF worker (e.g.:
  81. --pyExecutable
  82. /usr/local/bin/python3). The
  83. python UDF worker depends on
  84. Python 3.8+, Apache Beam
  85. (version == 2.43.0), Pip
  86. (version >= 20.3) and SetupTools
  87. (version >= 37.0.0). Please
  88. ensure that the specified
  89. environment meets the above
  90. requirements.
  91. -pyfs,--pyFiles <pythonFiles> Attach custom files for job. The
  92. standard resource file suffixes
  93. such as .py/.egg/.zip/.whl or
  94. directory are all supported.
  95. These files will be added to the
  96. PYTHONPATH of both the local
  97. client and the remote python UDF
  98. worker. Files suffixed with .zip
  99. will be extracted and added to
  100. PYTHONPATH. Comma (',') could be
  101. used as the separator to specify
  102. multiple files (e.g., --pyFiles
  103. file:///tmp/myresource.zip,hdfs:
  104. ///$namenode_address/myresource2
  105. .zip).
  106. -pyreq,--pyRequirements <arg> Specify a requirements.txt file
  107. which defines the third-party
  108. dependencies. These dependencies
  109. will be installed and added to
  110. the PYTHONPATH of the python UDF
  111. worker. A directory which
  112. contains the installation
  113. packages of these dependencies
  114. could be specified optionally.
  115. Use '#' as the separator if the
  116. optional parameter exists (e.g.,
  117. --pyRequirements
  118. file:///tmp/requirements.txt#fil
  119. e:///tmp/cached_dir).
  120. -s,--session <session identifier> The identifier for a session.
  121. 'default' is the default
  122. identifier.
  123. -u,--update <SQL update statement> Deprecated Experimental (for
  124. testing only!) feature:
  125. Instructs the SQL Client to
  126. immediately execute the given
  127. update statement after starting
  128. up. The process is shut down
  129. after the statement has been
  130. submitted to the cluster and
  131. returns an appropriate return
  132. code. Currently, this feature is
  133. only supported for INSERT INTO
  134. statements that declare the
  135. target sink table.Please use
  136. option -f to submit update
  137. statement.
  138. Mode "gateway" mode connects to the SQL gateway for submission.
  139. Syntax: gateway [OPTIONS]
  140. "gateway" mode options:
  141. -D <session dynamic config key=val> The dynamic config key=val for a
  142. session.
  143. -e,--endpoint <SQL Gateway address> The address of the remote SQL Gateway
  144. to connect.
  145. -f,--file <script file> Script file that should be executed.
  146. In this mode, the client will not
  147. open an interactive terminal.
  148. -h,--help Show the help message with
  149. descriptions of all options.
  150. -hist,--history <History file path> The file which you want to save the
  151. command history into. If not
  152. specified, we will auto-generate one
  153. under your user's home directory.
  154. -i,--init <initialization file> Script file that used to init the
  155. session context. If get error in
  156. execution, the sql client will exit.
  157. Notice it's not allowed to add query
  158. or insert into the init file.
  159. -s,--session <session identifier> The identifier for a session.
  160. 'default' is the default identifier.
  161. -u,--update <SQL update statement> Deprecated Experimental (for testing
  162. only!) feature: Instructs the SQL
  163. Client to immediately execute the
  164. given update statement after starting
  165. up. The process is shut down after
  166. the statement has been submitted to
  167. the cluster and returns an
  168. appropriate return code. Currently,
  169. this feature is only supported for
  170. INSERT INTO statements that declare
  171. the target sink table.Please use
  172. option -f to submit update statement.

SQL Client Configuration

You can configure the SQL Client by setting the options below, or any valid Flink configuration entry:

  1. SET 'key' = 'value';
KeyDefaultTypeDescription
sql-client.display.color-schema

Batch Streaming
“DEFAULT”StringSQL highlight color schema to be used at SQL client. Possible values: ‘default’, ‘dark’, ‘light’, ‘chester’, ‘vs2010’, ‘solarized’, ‘obsidian’, ‘geshi’
sql-client.display.max-column-width

Batch Streaming
30IntegerDeprecated, please use table.display.max-column-width instead. When printing the query results, this parameter determines the number of characters shown on screen before truncating. This only applies to columns with variable-length types (e.g. CHAR, VARCHAR, STRING) in streaming mode. Fixed-length types and all types in batch mode are printed using a deterministic column width.
sql-client.display.print-time-cost

Batch
trueBooleanDetermine whether to display the time consumption of the query. By default, no query time cost will be displayed.
sql-client.display.show-line-numbers

Batch Streaming
falseBooleanDetermines whether there should be shown line numbers in multiline SQL or not.
sql-client.execution.max-table-result.rows

Batch Streaming
1000000IntegerThe number of rows to cache when in the table mode. If the number of rows exceeds the specified value, it retries the row in the FIFO style.
sql-client.execution.result-mode

Batch Streaming
TABLE

Enum

Determines how the query result should be displayed.

Possible values:
  • “TABLE”: Materializes results in memory and visualizes them in a regular, paginated table representation.
  • “CHANGELOG”: Visualizes the result stream that is produced by a continuous query.
  • “TABLEAU”: Display results in the screen directly in a tableau format.
sql-client.verbose

Batch Streaming
falseBooleanDetermine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause.

SQL Client result modes

The CLI supports three modes for maintaining and visualizing results.

The table mode materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:

  1. SET 'sql-client.execution.result-mode' = 'table';

The result of a query would then look like this, you can use the keys indicated at the bottom of the screen as well as the arrows keys to navigate and open the various records:

  1. name age isHappy dob height
  2. user1 20 true 1995-12-03 1.7
  3. user2 30 true 1972-08-02 1.89
  4. user3 40 false 1983-12-23 1.63
  5. user4 41 true 1977-11-13 1.72
  6. user5 22 false 1998-02-20 1.61
  7. user6 12 true 1969-04-08 1.58
  8. user7 38 false 1987-12-15 1.6
  9. user8 62 true 1996-08-05 1.82
  10. Q Quit + Inc Refresh G Goto Page N Next Page O Open Row
  11. R Refresh - Dec Refresh L Last Page P Prev Page

The changelog mode does not materialize results and visualizes the result stream that is produced by a continuous query consisting of insertions (+) and retractions (-).

  1. SET 'sql-client.execution.result-mode' = 'changelog';

The result of a query would then look like this:

  1. op name age isHappy dob height
  2. +I user1 20 true 1995-12-03 1.7
  3. +I user2 30 true 1972-08-02 1.89
  4. +I user3 40 false 1983-12-23 1.63
  5. +I user4 41 true 1977-11-13 1.72
  6. +I user5 22 false 1998-02-20 1.61
  7. +I user6 12 true 1969-04-08 1.58
  8. +I user7 38 false 1987-12-15 1.6
  9. +I user8 62 true 1996-08-05 1.82
  10. Q Quit + Inc Refresh O Open Row
  11. R Refresh - Dec Refresh

The tableau mode is more like a traditional way which will display the results in the screen directly with a tableau format. The displaying content will be influenced by the query execution type (execution.type).

  1. SET 'sql-client.execution.result-mode' = 'tableau';

The result of a query would then look like this:

  1. +----+--------------------------------+-------------+---------+------------+--------------------------------+
  2. | op | name | age | isHappy | dob | height |
  3. +----+--------------------------------+-------------+---------+------------+--------------------------------+
  4. | +I | user1 | 20 | true | 1995-12-03 | 1.7 |
  5. | +I | user2 | 30 | true | 1972-08-02 | 1.89 |
  6. | +I | user3 | 40 | false | 1983-12-23 | 1.63 |
  7. | +I | user4 | 41 | true | 1977-11-13 | 1.72 |
  8. | +I | user5 | 22 | false | 1998-02-20 | 1.61 |
  9. | +I | user6 | 12 | true | 1969-04-08 | 1.58 |
  10. | +I | user7 | 38 | false | 1987-12-15 | 1.6 |
  11. | +I | user8 | 62 | true | 1996-08-05 | 1.82 |
  12. +----+--------------------------------+-------------+---------+------------+--------------------------------+
  13. Received a total of 8 rows

Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. Otherwise, if you want to terminate a running query, just type CTRL-C in this case, the job and the printing will be stopped.

All these result modes can be useful during the prototyping of SQL queries. In all these modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured maximum number of rows (sql-client.execution.max-table-result.rows).

Attention Queries that are executed in a batch environment, can only be retrieved using the table or tableau result mode.

Initialize Session Using SQL Files

A SQL query needs a configuration environment in which it is executed. SQL Client supports the -i startup option to execute an initialization SQL file to setup environment when starting up the SQL Client. The so-called initialization SQL file can use DDLs to define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment.

An example of such a file is presented below.

  1. -- Define available catalogs
  2. CREATE CATALOG MyCatalog
  3. WITH (
  4. 'type' = 'hive'
  5. );
  6. USE CATALOG MyCatalog;
  7. -- Define available database
  8. CREATE DATABASE MyDatabase;
  9. USE MyDatabase;
  10. -- Define TABLE
  11. CREATE TABLE MyTable(
  12. MyField1 INT,
  13. MyField2 STRING
  14. ) WITH (
  15. 'connector' = 'filesystem',
  16. 'path' = '/path/to/something',
  17. 'format' = 'csv'
  18. );
  19. -- Define VIEW
  20. CREATE VIEW MyCustomView AS SELECT MyField2 FROM MyTable;
  21. -- Define user-defined functions here.
  22. CREATE FUNCTION myUDF AS 'foo.bar.AggregateUDF';
  23. -- Properties that change the fundamental execution behavior of a table program.
  24. SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
  25. SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
  26. SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
  27. SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
  28. SET 'pipeline.auto-watermark-interval' = '200'; --optional: interval for periodic watermarks
  29. SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
  30. SET 'table.exec.state.ttl' = '1000'; -- optional: table program's idle state time
  31. SET 'restart-strategy.type' = 'fixed-delay';
  32. -- Configuration options for adjusting and tuning table programs.
  33. SET 'table.optimizer.join-reorder-enabled' = 'true';
  34. SET 'table.exec.spill-compression.enabled' = 'true';
  35. SET 'table.exec.spill-compression.block-size' = '128kb';

This configuration:

  • connects to Hive catalogs and uses MyCatalog as the current catalog with MyDatabase as the current database of the catalog,
  • defines a table MyTable that can read data from a CSV file,
  • defines a view MyCustomView that declares a virtual table using a SQL query,
  • defines a user-defined function myUDF that can be instantiated using the class name,
  • uses streaming mode for running statements and a parallelism of 1,
  • runs exploratory queries in the table result mode,
  • and makes some planner adjustments around join reordering and spilling via configuration options.

When using -i <init.sql> option to initialize SQL Client session, the following statements are allowed in an initialization SQL file:

  • DDL(CREATE/DROP/ALTER),
  • USE CATALOG/DATABASE,
  • LOAD/UNLOAD MODULE,
  • SET command,
  • RESET command.

When execute queries or insert statements, please enter the interactive mode or use the -f option to submit the SQL statements.

Attention If SQL Client receives errors during initialization, SQL Client will exit with error messages.

Dependencies

The SQL Client does not require setting up a Java project using Maven, Gradle, or sbt. Instead, you can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR file separately (using --jar) or define entire library directories (using --library). For connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON), Flink provides ready-to-use JAR bundles. These JAR files can be downloaded for each release from the Maven central repository.

The full list of offered SQL JARs can be found on the connection to external systems page.

You can refer to the configuration section for information on how to configure connector and format dependencies.

Usage

SQL Client allows users to submit jobs either within the interactive command line or using -f option to execute sql file.

In both modes, SQL Client supports to parse and execute all types of the Flink supported SQL statements.

Interactive Command Line

In interactive Command Line, the SQL Client reads user inputs and executes the statement terminated by a semicolon (;).

SQL Client will print success message if the statement is executed successfully. When getting errors, SQL Client will also print error messages. By default, the error message only contains the error cause. In order to print the full exception stack for debugging, please set the sql-client.verbose to true through command SET 'sql-client.verbose' = 'true';.

Execute SQL Files

SQL Client supports to execute a SQL script file with the -f option. SQL Client will execute statements one by one in the SQL script file and print execution messages for each executed statements. Once a statement fails, the SQL Client will exit and all the remaining statements will not be executed.

An example of such a file is presented below.

  1. CREATE TEMPORARY TABLE users (
  2. user_id BIGINT,
  3. user_name STRING,
  4. user_level STRING,
  5. region STRING,
  6. PRIMARY KEY (user_id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'upsert-kafka',
  9. 'topic' = 'users',
  10. 'properties.bootstrap.servers' = '...',
  11. 'key.format' = 'csv',
  12. 'value.format' = 'avro'
  13. );
  14. -- set sync mode
  15. SET 'table.dml-sync' = 'true';
  16. -- set the job name
  17. SET 'pipeline.name' = 'SqlJob';
  18. -- set the queue that the job submit to
  19. SET 'yarn.application.queue' = 'root';
  20. -- set the job parallelism
  21. SET 'parallelism.default' = '100';
  22. -- restore from the specific savepoint path
  23. SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
  24. INSERT INTO pageviews_enriched
  25. SELECT *
  26. FROM pageviews AS p
  27. LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
  28. ON p.user_id = u.user_id;

This configuration:

  • defines a temporal table source users that reads from a CSV file,
  • set the properties, e.g job name,
  • set the savepoint path,
  • submit a sql job that load the savepoint from the specified savepoint path.

Attention Compared to the interactive mode, SQL Client will stop execution and exits when there are errors.

Execute a set of SQL statements

SQL Client execute each INSERT INTO statement as a single Flink job. However, this is sometimes not optimal because some part of the pipeline can be reused. SQL Client supports STATEMENT SET syntax to execute a set of SQL statements. This is an equivalent feature with StatementSet in Table API. The STATEMENT SET syntax encloses one or more INSERT INTO statements. All statements in a STATEMENT SET block are holistically optimized and executed as a single Flink job. Joint optimization and execution allows for reusing common intermediate results and can therefore significantly improve the efficiency of executing multiple queries.

Syntax

  1. EXECUTE STATEMENT SET
  2. BEGIN
  3. -- one or more INSERT INTO statements
  4. { INSERT INTO|OVERWRITE <select_statement>; }+
  5. END;

Attention The statements of enclosed in the STATEMENT SET must be separated by a semicolon (;). The old syntax BEGIN STATEMENT SET; ... END; is deprecated, may be removed in the future version.

SQL CLI

  1. Flink SQL> CREATE TABLE pageviews (
  2. > user_id BIGINT,
  3. > page_id BIGINT,
  4. > viewtime TIMESTAMP,
  5. > proctime AS PROCTIME()
  6. > ) WITH (
  7. > 'connector' = 'kafka',
  8. > 'topic' = 'pageviews',
  9. > 'properties.bootstrap.servers' = '...',
  10. > 'format' = 'avro'
  11. > );
  12. [INFO] Execute statement succeed.
  13. Flink SQL> CREATE TABLE pageview (
  14. > page_id BIGINT,
  15. > cnt BIGINT
  16. > ) WITH (
  17. > 'connector' = 'jdbc',
  18. > 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  19. > 'table-name' = 'pageview'
  20. > );
  21. [INFO] Execute statement succeed.
  22. Flink SQL> CREATE TABLE uniqueview (
  23. > page_id BIGINT,
  24. > cnt BIGINT
  25. > ) WITH (
  26. > 'connector' = 'jdbc',
  27. > 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  28. > 'table-name' = 'uniqueview'
  29. > );
  30. [INFO] Execute statement succeed.
  31. Flink SQL> EXECUTE STATEMENT SET
  32. > BEGIN
  33. >
  34. > INSERT INTO pageview
  35. > SELECT page_id, count(1)
  36. > FROM pageviews
  37. > GROUP BY page_id;
  38. >
  39. > INSERT INTO uniqueview
  40. > SELECT page_id, count(distinct user_id)
  41. > FROM pageviews
  42. > GROUP BY page_id;
  43. >
  44. > END;
  45. [INFO] Submitting SQL update statement to the cluster...
  46. [INFO] SQL update statement has been successfully submitted to the cluster:
  47. Job ID: 6b1af540c0c0bb3fcfcad50ac037c862

SQL File

  1. CREATE TABLE pageviews (
  2. user_id BIGINT,
  3. page_id BIGINT,
  4. viewtime TIMESTAMP,
  5. proctime AS PROCTIME()
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. 'topic' = 'pageviews',
  9. 'properties.bootstrap.servers' = '...',
  10. 'format' = 'avro'
  11. );
  12. CREATE TABLE pageview (
  13. page_id BIGINT,
  14. cnt BIGINT
  15. ) WITH (
  16. 'connector' = 'jdbc',
  17. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  18. 'table-name' = 'pageview'
  19. );
  20. CREATE TABLE uniqueview (
  21. page_id BIGINT,
  22. cnt BIGINT
  23. ) WITH (
  24. 'connector' = 'jdbc',
  25. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  26. 'table-name' = 'uniqueview'
  27. );
  28. EXECUTE STATEMENT SET
  29. BEGIN
  30. INSERT INTO pageview
  31. SELECT page_id, count(1)
  32. FROM pageviews
  33. GROUP BY page_id;
  34. INSERT INTO uniqueview
  35. SELECT page_id, count(distinct user_id)
  36. FROM pageviews
  37. GROUP BY page_id;
  38. END;

Execute DML statements sync/async

By default, SQL Client executes DML statements asynchronously. That means, SQL Client will submit a job for the DML statement to a Flink cluster, and not wait for the job to finish. So SQL Client can submit multiple jobs at the same time. This is useful for streaming jobs, which are long-running in general.

SQL Client makes sure that a statement is successfully submitted to the cluster. Once the statement is submitted, the CLI will show information about the Flink job.

  1. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
  2. [INFO] Table update statement has been successfully submitted to the cluster:
  3. Cluster ID: StandaloneClusterId
  4. Job ID: 6f922fe5cba87406ff23ae4a7bb79044

Attention The SQL Client does not track the status of the running Flink job after submission. The CLI process can be shutdown after the submission without affecting the detached query. Flink’s restart strategy takes care of the fault-tolerance. Please use the job statements to monitor the detached query status or stop the detached query.

However, for batch users, it’s more common that the next DML statement requires waiting until the previous DML statement finishes. In order to execute DML statements synchronously, you can set table.dml-sync option to true in SQL Client.

  1. Flink SQL> SET 'table.dml-sync' = 'true';
  2. [INFO] Session property has been set.
  3. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
  4. [INFO] Submitting SQL update statement to the cluster...
  5. [INFO] Execute statement in sync mode. Please wait for the execution finish...
  6. [INFO] Complete execution of the SQL update statement.

Attention If you want to terminate the job, just type CTRL-C to cancel the execution.

Start a SQL Job from a savepoint

Flink supports to start the job with specified savepoint. In SQL Client, it’s allowed to use SET command to specify the path of the savepoint.

  1. Flink SQL> SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
  2. [INFO] Session property has been set.
  3. -- all the following DML statements will be restroed from the specified savepoint path
  4. Flink SQL> INSERT INTO ...

When the path to savepoint is specified, Flink will try to restore the state from the savepoint when executing all the following DML statements.

Because the specified savepoint path will affect all the following DML statements, you can use RESET command to reset this config option, i.e. disable restoring from savepoint.

  1. Flink SQL> RESET execution.savepoint.path;
  2. [INFO] Session property has been reset.

For more details about creating and managing savepoints, please refer to Job Lifecycle Management.

Define a Custom Job Name

SQL Client supports to define job name for queries and DML statements through SET command.

  1. Flink SQL> SET 'pipeline.name' = 'kafka-to-hive';
  2. [INFO] Session property has been set.
  3. -- all the following DML statements will use the specified job name.
  4. Flink SQL> INSERT INTO ...

Because the specified job name will affect all the following queries and DML statements, you can also use RESET command to reset this configuration, i.e. use default job names.

  1. Flink SQL> RESET pipeline.name;
  2. [INFO] Session property has been reset.

If the option pipeline.name is not specified, SQL Client will generate a default name for the submitted job, e.g. insert-into_<sink_table_name> for INSERT INTO statements.

Monitoring Job Status

SQL Client supports to list jobs status in the cluster through SHOW JOBS statements.

  1. Flink SQL> SHOW JOBS;
  2. +----------------------------------+---------------+----------+-------------------------+
  3. | job id | job name | status | start time |
  4. +----------------------------------+---------------+----------+-------------------------+
  5. | 228d70913eab60dda85c5e7f78b5782c | kafka-to-hive | RUNNING | 2023-02-11T05:03:51.523 |
  6. +----------------------------------+---------------+----------+-------------------------+

Terminating a Job

SQL Client supports to stop jobs with or without savepoints through STOP JOB statements.

  1. Flink SQL> STOP JOB '228d70913eab60dda85c5e7f78b5782c' WITH SAVEPOINT;
  2. +-----------------------------------------+
  3. | savepoint path |
  4. +-----------------------------------------+
  5. | file:/tmp/savepoint-3addd4-0b224d9311e6 |
  6. +-----------------------------------------+

The savepoint path could be specified with state.savepoints.dir either in the cluster configuration or session configuration (the latter would take precedence).

For more details about stopping jobs, please refer to Job Statements.

SQL Syntax highlighting

SQL Client can highlight SQL syntax with several color schemes. With sql-client.display.color-schema it could be set a color scheme. Available color schemes: chester, dracula, solarized, vs2010, obsidian, geshi, dark, light, default (no highlighting). In case of wrong name the fallback is to default.

Color schema \ StyleKeywordDefaultCommentHintQuotedSQL Identifier
DefaultDefaultDefaultDefaultDefaultDefaultDefault
ChesterBold blueWhiteItalic greenBold greenRedCyan
DarkBold blueWhiteItalic brightBold brightGreenCyan
DraculaBold magentaWhiteItalic cyanBold cyanGreenRed
GeshiBold #993333WhiteItalic #808080Bold #808080#66CC66#000099
LightBold redBlackItalic brightBold brightGreenCyan
ObsidianBold greenWhiteItalic brightBold brightRedMagenta
VS2010Bold blueWhiteItalic greenBold greenRedMagenta
SolarizedBold yellowBlueItalic brightBold brightGreenRed