ALTER Statements

ALTER statements are used to modify the definition of a table, view or function that has already been registered in the Catalog, or the definition of a catalog itself.

Flink SQL supports the following ALTER statements for now:

  • ALTER TABLE
  • ALTER VIEW
  • ALTER DATABASE
  • ALTER FUNCTION
  • ALTER CATALOG

Run an ALTER statement

Java

ALTER statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() method returns ‘OK’ for a successful ALTER operation, otherwise will throw an exception.

The following examples show how to run an ALTER statement in TableEnvironment.

Scala

ALTER statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() method returns ‘OK’ for a successful ALTER operation, otherwise will throw an exception.

The following examples show how to run an ALTER statement in TableEnvironment.

Python

ALTER statements can be executed with the execute_sql() method of the TableEnvironment. The execute_sql() method returns ‘OK’ for a successful ALTER operation, otherwise will throw an exception.

The following examples show how to run an ALTER statement in TableEnvironment.

SQL CLI

ALTER statements can be executed in SQL CLI.

The following examples show how to run an ALTER statement in SQL CLI.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance()...
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. // register a table named "Orders"
  4. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
  5. // a string array: ["Orders"]
  6. String[] tables = tableEnv.listTables();
  7. // or tableEnv.executeSql("SHOW TABLES").print();
  8. // add a new column `order` to the first position
  9. tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST");
  10. // add more columns, primary key and watermark
  11. tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)");
  12. // modify column type, column comment and watermark
  13. tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)");
  14. // drop watermark
  15. tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK");
  16. // drop column
  17. tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)");
  18. // rename column
  19. tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id");
  20. // rename "Orders" to "NewOrders"
  21. tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders");
  22. // a string array: ["NewOrders"]
  23. String[] tables = tableEnv.listTables();
  24. // or tableEnv.executeSql("SHOW TABLES").print();
  25. // register a catalog named "cat2"
  26. tableEnv.executeSql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')");
  27. // add a new property `default-database`
  28. tableEnv.executeSql("ALTER CATALOG cat2 SET ('default-database'='db')");

Scala

  1. val tableEnv = TableEnvironment.create(...)
  2. // register a table named "Orders"
  3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. // add a new column `order` to the first position
  5. tableEnv.executeSql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST")
  6. // add more columns, primary key and watermark
  7. tableEnv.executeSql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)")
  8. // modify column type, column comment and watermark
  9. tableEnv.executeSql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)")
  10. // drop watermark
  11. tableEnv.executeSql("ALTER TABLE Orders DROP WATERMARK")
  12. // drop column
  13. tableEnv.executeSql("ALTER TABLE Orders DROP (amount, ts, category)")
  14. // rename column
  15. tableEnv.executeSql("ALTER TABLE Orders RENAME `order` TO order_id")
  16. // a string array: ["Orders"]
  17. val tables = tableEnv.listTables()
  18. // or tableEnv.executeSql("SHOW TABLES").print()
  19. // rename "Orders" to "NewOrders"
  20. tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders")
  21. // a string array: ["NewOrders"]
  22. val tables = tableEnv.listTables()
  23. // or tableEnv.executeSql("SHOW TABLES").print()
  24. // register a catalog named "cat2"
  25. tableEnv.executeSql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')")
  26. // add a new property `default-database`
  27. tableEnv.executeSql("ALTER CATALOG cat2 SET ('default-database'='db')")

Python

  1. table_env = TableEnvironment.create(...)
  2. # a string array: ["Orders"]
  3. tables = table_env.list_tables()
  4. # or table_env.execute_sql("SHOW TABLES").print()
  5. # add a new column `order` to the first position
  6. table_env.execute_sql("ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST")
  7. # add more columns, primary key and watermark
  8. table_env.execute_sql("ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR)")
  9. # modify column type, column comment and watermark
  10. table_env.execute_sql("ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts)")
  11. # drop watermark
  12. table_env.execute_sql("ALTER TABLE Orders DROP WATERMARK")
  13. # drop column
  14. table_env.execute_sql("ALTER TABLE Orders DROP (amount, ts, category)")
  15. # rename column
  16. table_env.execute_sql("ALTER TABLE Orders RENAME `order` TO order_id")
  17. # rename "Orders" to "NewOrders"
  18. table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders")
  19. # a string array: ["NewOrders"]
  20. tables = table_env.list_tables()
  21. # or table_env.execute_sql("SHOW TABLES").print()
  22. # register a catalog named "cat2"
  23. table_env.execute_sql("CREATE CATALOG cat2 WITH ('type'='generic_in_memory')")
  24. # add a new property `default-database`
  25. table_env.execute_sql("ALTER CATALOG cat2 SET ('default-database'='db')")

SQL CLI

  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Execute statement succeeded.
  3. Flink SQL> ALTER TABLE Orders ADD `order` INT COMMENT 'order identifier' FIRST;
  4. [INFO] Execute statement succeeded.
  5. Flink SQL> DESCRIBE Orders;
  6. +---------+--------+------+-----+--------+-----------+------------------+
  7. | name | type | null | key | extras | watermark | comment |
  8. +---------+--------+------+-----+--------+-----------+------------------+
  9. | order | INT | TRUE | | | | order identifier |
  10. | user | BIGINT | TRUE | | | | |
  11. | product | STRING | TRUE | | | | |
  12. | amount | INT | TRUE | | | | |
  13. +---------+--------+------+-----+--------+-----------+------------------+
  14. 4 rows in set
  15. Flink SQL> ALTER TABLE Orders ADD (ts TIMESTAMP(3), category STRING AFTER product, PRIMARY KEY(`order`) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' HOUR);
  16. [INFO] Execute statement succeeded.
  17. Flink SQL> DESCRIBE Orders;
  18. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  19. | name | type | null | key | extras | watermark | comment |
  20. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  21. | order | INT | FALSE | PRI(order) | | | order identifier |
  22. | user | BIGINT | TRUE | | | | |
  23. | product | STRING | TRUE | | | | |
  24. | category | STRING | TRUE | | | | |
  25. | amount | INT | TRUE | | | | |
  26. | ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' HOUR | |
  27. +----------+------------------------+-------+------------+--------+--------------------------+------------------+
  28. 6 rows in set
  29. Flink SQL> ALTER TABLE Orders MODIFY (amount DOUBLE NOT NULL, category STRING COMMENT 'category identifier' AFTER `order`, WATERMARK FOR ts AS ts);
  30. [INFO] Execute statement succeeded.
  31. Flink SQL> DESCRIBE Orders;
  32. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  33. | name | type | null | key | extras | watermark | comment |
  34. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  35. | order | INT | FALSE | PRI(order) | | | order identifier |
  36. | category | STRING | TRUE | | | | category identifier |
  37. | user | BIGINT | TRUE | | | | |
  38. | product | STRING | TRUE | | | | |
  39. | amount | DOUBLE | FALSE | | | | |
  40. | ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` | |
  41. +----------+------------------------+-------+------------+--------+-----------+---------------------+
  42. 6 rows in set
  43. Flink SQL> ALTER TABLE Orders DROP WATERMARK;
  44. [INFO] Execute statement succeeded.
  45. Flink SQL> DESCRIBE Orders;
  46. +----------+--------------+-------+------------+--------+-----------+---------------------+
  47. | name | type | null | key | extras | watermark | comment |
  48. +----------+--------------+-------+------------+--------+-----------+---------------------+
  49. | order | INT | FALSE | PRI(order) | | | order identifier |
  50. | category | STRING | TRUE | | | | category identifier |
  51. | user | BIGINT | TRUE | | | | |
  52. | product | STRING | TRUE | | | | |
  53. | amount | DOUBLE | FALSE | | | | |
  54. | ts | TIMESTAMP(3) | TRUE | | | | |
  55. +----------+--------------+-------+------------+--------+-----------+---------------------+
  56. 6 rows in set
  57. Flink SQL> ALTER TABLE Orders DROP (amount, ts, category);
  58. [INFO] Execute statement succeeded.
  59. Flink SQL> DESCRIBE Orders;
  60. +---------+--------+-------+------------+--------+-----------+------------------+
  61. | name | type | null | key | extras | watermark | comment |
  62. +---------+--------+-------+------------+--------+-----------+------------------+
  63. | order | INT | FALSE | PRI(order) | | | order identifier |
  64. | user | BIGINT | TRUE | | | | |
  65. | product | STRING | TRUE | | | | |
  66. +---------+--------+-------+------------+--------+-----------+------------------+
  67. 3 rows in set
  68. Flink SQL> ALTER TABLE Orders RENAME `order` to `order_id`;
  69. [INFO] Execute statement succeeded.
  70. Flink SQL> DESCRIBE Orders;
  71. +----------+--------+-------+---------------+--------+-----------+------------------+
  72. | name | type | null | key | extras | watermark | comment |
  73. +----------+--------+-------+---------------+--------+-----------+------------------+
  74. | order_id | INT | FALSE | PRI(order_id) | | | order identifier |
  75. | user | BIGINT | TRUE | | | | |
  76. | product | STRING | TRUE | | | | |
  77. +----------+--------+-------+---------------+--------+-----------+------------------+
  78. 3 rows in set
  79. Flink SQL> SHOW TABLES;
  80. +------------+
  81. | table name |
  82. +------------+
  83. | Orders |
  84. +------------+
  85. 1 row in set
  86. Flink SQL> ALTER TABLE Orders RENAME TO NewOrders;
  87. [INFO] Execute statement succeeded.
  88. Flink SQL> SHOW TABLES;
  89. +------------+
  90. | table name |
  91. +------------+
  92. | NewOrders |
  93. +------------+
  94. 1 row in set
  95. Flink SQL> CREATE CATALOG cat2 WITH ('type'='generic_in_memory');
  96. [INFO] Execute statement succeeded.
  97. Flink SQL> ALTER CATALOG cat2 SET ('default-database'='db_new');
  98. [INFO] Execute statement succeeded.
  99. Flink SQL> DESC CATALOG EXTENDED cat2;
  100. +-------------------------+-------------------+
  101. | info name | info value |
  102. +-------------------------+-------------------+
  103. | name | cat2 |
  104. | type | generic_in_memory |
  105. | comment | |
  106. | option:default-database | db_new |
  107. +-------------------------+-------------------+
  108. 4 rows in set

ALTER TABLE

The following grammar gives an overview about the available syntax:

  1. ALTER TABLE [IF EXISTS] table_name {
  2. ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...] | <distribution> }
  3. | MODIFY { <schema_component> | (<schema_component> [, ...]) | <distribution> }
  4. | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...] | DISTRIBUTION }
  5. | RENAME old_column_name TO new_column_name
  6. | RENAME TO new_table_name
  7. | SET (key1=val1, ...)
  8. | RESET (key1, ...)
  9. }
  10. <schema_component>:
  11. { <column_component> | <constraint_component> | <watermark_component> }
  12. <column_component>:
  13. column_name <column_definition> [FIRST | AFTER column_name]
  14. <constraint_component>:
  15. [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
  16. <watermark_component>:
  17. WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  18. <column_definition>:
  19. { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment]
  20. <physical_column_definition>:
  21. column_type
  22. <metadata_column_definition>:
  23. column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
  24. <computed_column_definition>:
  25. AS computed_column_expression
  26. <partition_component>:
  27. PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)]
  28. <distribution>:
  29. {
  30. DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  31. | DISTRIBUTION INTO n BUCKETS
  32. }

IF EXISTS

If the table does not exist, nothing happens.

ADD

Use ADD clause to add columns, constraints, a watermark, partitions, and a distribution to an existing table.

To add a column at the specified position, use FIRST or AFTER col_name. By default, the column is appended at last.

The following examples illustrate the usage of the ADD statements.

  1. -- add a new column
  2. ALTER TABLE MyTable ADD category_id STRING COMMENT 'identifier of the category';
  3. -- add columns, constraint, and watermark
  4. ALTER TABLE MyTable ADD (
  5. log_ts STRING COMMENT 'log timestamp string' FIRST,
  6. ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
  7. PRIMARY KEY (id) NOT ENFORCED,
  8. WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
  9. );
  10. -- add a new partition
  11. ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1');
  12. -- add two new partitions
  13. ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2');
  14. -- add new distribution using a hash on uid into 4 buckets
  15. ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS;
  16. -- add new distribution on uid into 4 buckets
  17. CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS;
  18. -- add new distribution on uid.
  19. CREATE TABLE MyTable ADD DISTRIBUTION BY (uid);
  20. -- add new distribution into 4 buckets
  21. CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS;

Note Add a column to be primary key will change the column’s nullability to false implicitly.

MODIFY

Use MODIFY clause to change column’s position, type, comment or nullability, change primary key columns and watermark strategy to an existing table.

To modify an existent column to a new position, use FIRST or AFTER col_name. By default, the position remains unchanged.

The following examples illustrate the usage of the MODIFY statements.

  1. -- modify a column type, comment and position
  2. ALTER TABLE MyTable MODIFY measurement double COMMENT 'unit is bytes per second' AFTER `id`;
  3. -- modify definition of column log_ts and ts, primary key, watermark. They must exist in table schema
  4. ALTER TABLE MyTable MODIFY (
  5. log_ts STRING COMMENT 'log timestamp string' AFTER `id`, -- reorder columns
  6. ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
  7. PRIMARY KEY (id) NOT ENFORCED,
  8. WATERMARK FOR ts AS ts -- modify watermark strategy
  9. );

Note Modify a column to be primary key will change the column’s nullability to false implicitly.

DROP

Use the DROP clause to drop columns, primary key, partitions, and watermark strategy to an existing table.

The following examples illustrate the usage of the DROP statements.

  1. -- drop a column
  2. ALTER TABLE MyTable DROP measurement;
  3. -- drop columns
  4. ALTER TABLE MyTable DROP (col1, col2, col3);
  5. -- drop primary key
  6. ALTER TABLE MyTable DROP PRIMARY KEY;
  7. -- drop a partition
  8. ALTER TABLE MyTable DROP PARTITION (`id` = 1);
  9. -- drop two partitions
  10. ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2);
  11. -- drop a watermark
  12. ALTER TABLE MyTable DROP WATERMARK;
  13. -- drop distribution
  14. ALTER TABLE MyTable DROP DISTRIBUTION;

RENAME

Use RENAME clause to rename column or an existing table.

The following examples illustrate the usage of the RENAME statements.

  1. -- rename column
  2. ALTER TABLE MyTable RENAME request_body TO payload;
  3. -- rename table
  4. ALTER TABLE MyTable RENAME TO MyTable2;

SET

Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.

The following examples illustrate the usage of the SET statements.

  1. -- set 'rows-per-second'
  2. ALTER TABLE DataGenSource SET ('rows-per-second' = '10');

RESET

Reset one or more properties to its default value.

The following examples illustrate the usage of the RESET statements.

  1. -- reset 'rows-per-second' to the default value
  2. ALTER TABLE DataGenSource RESET ('rows-per-second');

ALTER VIEW

  1. ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name

Renames a given view to a new name within the same catalog and database.

  1. ALTER VIEW [catalog_name.][db_name.]view_name AS new_query_expression

Changes the underlying query defining the given view to a new query.

ALTER DATABASE

  1. ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one.

ALTER FUNCTION

  1. ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  2. [IF EXISTS] [catalog_name.][db_name.]function_name
  3. AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

Alter a catalog function with the new identifier and optional language tag. If a function doesn’t exist in the catalog, an exception is thrown.

If the language tag is JAVA/SCALA, the identifier is the full classpath of the UDF. For the implementation of Java/Scala UDF, please refer to User-defined Functions for more details.

If the language tag is PYTHON, the identifier is the fully qualified name of the UDF, e.g. pyflink.table.tests.test_udf.add. For the implementation of Python UDF, please refer to Python UDFs for more details.

TEMPORARY

Alter temporary catalog function that has catalog and database namespaces and overrides catalog functions.

TEMPORARY SYSTEM

Alter temporary system function that has no namespace and overrides built-in functions

IF EXISTS

If the function doesn’t exist, nothing happens.

LANGUAGE JAVA|SCALA|PYTHON

Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA.

ALTER CATALOG

  1. ALTER CATALOG catalog_name
  2. SET (key1=val1, ...)
  3. | RESET (key1, ...)
  4. | COMMENT 'comment'

SET

Set one or more properties in the specified catalog. If a particular property is already set in the catalog, override the old value with the new one.

The following examples illustrate the usage of the SET statements.

  1. -- set 'default-database'
  2. ALTER CATALOG cat2 SET ('default-database'='db');

RESET

Reset one or more properties to its default value in the specified catalog.

The following examples illustrate the usage of the RESET statements.

  1. -- reset 'default-database'
  2. ALTER CATALOG cat2 RESET ('default-database');

COMMENT

Set comment in the specified catalog. If the comment is already set in the catalog, override the old value with the new one.

The following examples illustrate the usage of the COMMENT statements.

  1. ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2''';