SQL DDL

This page describes support for creating and altering tables using SQL across various engines.

Spark SQL

Create table

You can create tables using standard CREATE TABLE syntax, which supports partitioning and passing table properties.

  1. CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
  2. [(col_name data_type [COMMENT col_comment], ...)]
  3. [COMMENT table_comment]
  4. [PARTITIONED BY (col_name, ...)]
  5. [ROW FORMAT row_format]
  6. [STORED AS file_format]
  7. [LOCATION path]
  8. [TBLPROPERTIES (property_name=property_value, ...)]
  9. [AS select_statement];

SQL DDL - 图1NOTE:

For users running this tutorial locally and have a Spark-Hive(HMS) integration in their environment: If you use default database or if you don’t provide [LOCATION path] with the DDL statement, Spark will return java.io.IOException: Mkdirs failed to create file:/user/hive/warehouse/hudi_table/.hoodie error. To get around this, you can follow either of the two options mentioned below:

  1. Create a database i.e. CREATE DATABASE hudidb; and use it i.e. USE hudidb; before running the DDL statement.
  2. Or provide a path using LOCATION keyword to persist the data with the DDL statement.

Create non-partitioned table

Creating a non-partitioned table is as simple as creating a regular table.

  1. -- create a Hudi table
  2. CREATE TABLE IF NOT EXISTS hudi_table (
  3. id INT,
  4. name STRING,
  5. price DOUBLE
  6. ) USING hudi;

Create partitioned table

A partitioned table can be created by adding a partitioned by clause. Partitioning helps to organize the data into multiple folders based on the partition columns. It can also help speed up queries and index lookups by limiting the amount of metadata, index and data scanned.

  1. CREATE TABLE IF NOT EXISTS hudi_table_partitioned (
  2. id BIGINT,
  3. name STRING,
  4. dt STRING,
  5. hh STRING
  6. ) USING hudi
  7. TBLPROPERTIES (
  8. type = 'cow'
  9. )
  10. PARTITIONED BY (dt);

SQL DDL - 图2note

You can also create a table partitioned by multiple fields by supplying comma-separated field names. When creating a table partitioned by multiple fields, ensure that you specify the columns in the PARTITIONED BY clause in the same order as they appear in the CREATE TABLE schema. For example, for the above table, the partition fields should be specified as PARTITIONED BY (dt, hh).

Create table with record keys and ordering fields

As discussed here, tables track each record in the table using a record key. Hudi auto-generated a highly compressed key for each new record in the examples so far. If you want to use an existing field as the key, you can set the primaryKey option. Typically, this is also accompanied by configuring a preCombineField option to deal with out-of-order data and potential duplicate records with the same key in the incoming writes.

SQL DDL - 图3note

You can choose multiple fields as primary keys for a given table on a need basis. For eg, “primaryKey = ‘id, name’”, and this materializes a composite key of the two fields, which can be useful for exploring the table.

Here is an example of creating a table using both options. Typically, a field that denotes the time of the event or fact, e.g., order creation time, event generation time etc., is used as the preCombineField. Hudi resolves multiple versions of the same record by ordering based on this field when queries are run on the table.

  1. CREATE TABLE IF NOT EXISTS hudi_table_keyed (
  2. id INT,
  3. name STRING,
  4. price DOUBLE,
  5. ts BIGINT
  6. ) USING hudi
  7. TBLPROPERTIES (
  8. type = 'cow',
  9. primaryKey = 'id',
  10. preCombineField = 'ts'
  11. );

Create table with merge modes

Hudi supports different record merge modes to handle merge of incoming records with existing records. To create a table with specific record merge mode, you can set recordMergeMode option.

  1. CREATE TABLE IF NOT EXISTS hudi_table_merge_mode (
  2. id INT,
  3. name STRING,
  4. ts LONG,
  5. price DOUBLE
  6. ) USING hudi
  7. TBLPROPERTIES (
  8. type = 'mor',
  9. primaryKey = 'id',
  10. precombineField = 'ts',
  11. recordMergeMode = 'EVENT_TIME_ORDERING'
  12. )
  13. LOCATION 'file:///tmp/hudi_table_merge_mode/';

With EVENT_TIME_ORDERING, the record with the larger event time (precombineField) overwrites the record with the smaller event time on the same key, regardless of transaction’s commit time. Users can set CUSTOM mode to provide their own merge logic. With CUSTOM merge mode, you can provide a custom class that implements the merge logic. The interfaces to implement is explained in detail here.

  1. CREATE TABLE IF NOT EXISTS hudi_table_merge_mode_custom (
  2. id INT,
  3. name STRING,
  4. ts LONG,
  5. price DOUBLE
  6. ) USING hudi
  7. TBLPROPERTIES (
  8. type = 'mor',
  9. primaryKey = 'id',
  10. precombineField = 'ts',
  11. recordMergeMode = 'CUSTOM',
  12. 'hoodie.record.merge.strategy.id' = '<unique-uuid>'
  13. )
  14. LOCATION 'file:///tmp/hudi_table_merge_mode_custom/';

Create table from an external location

Often, Hudi tables are created from streaming writers like the streamer tool, which may later need some SQL statements to run on them. You can create an External table using the location statement.

  1. CREATE TABLE hudi_table_external
  2. USING hudi
  3. LOCATION 'file:///tmp/hudi_table/';

SQL DDL - 图4tip

You don’t need to specify the schema and any properties except the partitioned columns if they exist. Hudi can automatically recognize the schema and configurations.

Create Table As Select (CTAS)

Hudi supports CTAS(Create table as select) to support initial loads into Hudi tables. To ensure this is done efficiently, even for large loads, CTAS uses bulk insert as the write operation

  1. # create managed parquet table
  2. CREATE TABLE parquet_table
  3. USING parquet
  4. LOCATION 'file:///tmp/parquet_dataset/';
  5. # CTAS by loading data into Hudi table
  6. CREATE TABLE hudi_table_ctas
  7. USING hudi
  8. TBLPROPERTIES (
  9. type = 'cow',
  10. preCombineField = 'ts'
  11. )
  12. PARTITIONED BY (dt)
  13. AS SELECT * FROM parquet_table;

You can create a non-partitioned table as well

  1. # create managed parquet table
  2. CREATE TABLE parquet_table
  3. USING parquet
  4. LOCATION 'file:///tmp/parquet_dataset/';
  5. # CTAS by loading data into Hudi table
  6. CREATE TABLE hudi_table_ctas
  7. USING hudi
  8. TBLPROPERTIES (
  9. type = 'cow',
  10. preCombineField = 'ts'
  11. )
  12. AS SELECT * FROM parquet_table;

If you prefer explicitly setting the record keys, you can do so by setting primaryKey config in table properties.

  1. CREATE TABLE hudi_table_ctas
  2. USING hudi
  3. TBLPROPERTIES (
  4. type = 'cow',
  5. primaryKey = 'id'
  6. )
  7. PARTITIONED BY (dt)
  8. AS
  9. SELECT 1 AS id, 'a1' AS name, 10 AS price, 1000 AS dt;

You can also use CTAS to copy data across external locations

  1. # create managed parquet table
  2. CREATE TABLE parquet_table
  3. USING parquet
  4. LOCATION 'file:///tmp/parquet_dataset/*.parquet';
  5. # CTAS by loading data into hudi table
  6. CREATE TABLE hudi_table_ctas
  7. USING hudi
  8. LOCATION 'file:///tmp/hudi/hudi_tbl/'
  9. TBLPROPERTIES (
  10. type = 'cow'
  11. )
  12. AS SELECT * FROM parquet_table;

Create Index

Hudi supports creating and dropping different types of indexes on a table. For more information on different type of indexes please refer multi-modal indexing. Secondary index, expression index and record indexes can be created using SQL create index command.

  1. -- Create Index
  2. CREATE INDEX [IF NOT EXISTS] index_name ON [TABLE] table_name
  3. [USING index_type]
  4. (column_name1 [OPTIONS(key1=value1, key2=value2, ...)], column_name2 [OPTIONS(key1=value1, key2=value2, ...)], ...)
  5. [OPTIONS (key1=value1, key2=value2, ...)]
  6. -- Record index syntax
  7. CREATE INDEX indexName ON tableIdentifier (primaryKey1 [, primayKey2 ...]);
  8. -- Secondary Index Syntax
  9. CREATE INDEX indexName ON tableIdentifier (nonPrimaryKey);
  10. -- Expression Index Syntax
  11. CREATE INDEX indexName ON tableIdentifier USING column_stats(col) OPTIONS(expr='expr_val', format='format_val');
  12. CREATE INDEX indexName ON tableIdentifier USING bloom_filters(col) OPTIONS(expr='expr_val');
  13. -- Drop Index
  14. DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
  • index_name is the name of the index to be created or dropped.
  • table_name is the name of the table on which the index is created or dropped.
  • index_type is the type of the index to be created. Currently, only column_stats and bloom_filters is supported. If the using .. clause is omitted, a secondary record index is created.
  • column_name is the name of the column on which the index is created.

Both index and column on which the index is created can be qualified with some options in the form of key-value pairs.

SQL DDL - 图5note

Please note in order to create secondary index:

  1. The table must have a primary key and merge mode should be COMMIT_TIME_ORDERING.
  2. Record index must be enabled. This can be done by setting hoodie.metadata.record.index.enable=true and then creating record_index. Please note the example below.
  3. Secondary index is not supported for complex types.

Examples

  1. -- Create a table with primary key
  2. CREATE TABLE hudi_indexed_table (
  3. ts BIGINT,
  4. uuid STRING,
  5. rider STRING,
  6. driver STRING,
  7. fare DOUBLE,
  8. city STRING
  9. ) USING HUDI
  10. options(
  11. primaryKey ='uuid',
  12. hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
  13. )
  14. PARTITIONED BY (city);
  15. -- Add some data.
  16. INSERT INTO hudi_indexed_table
  17. VALUES
  18. ...
  19. -- Create bloom filter expression index on driver column
  20. CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity');
  21. -- It would show bloom filter expression index
  22. SHOW INDEXES FROM hudi_indexed_table;
  23. -- Query on driver column would prune the data using the idx_bloom_driver index
  24. SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S';
  25. -- Create column stat expression index on ts column
  26. CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
  27. -- Shows both expression indexes
  28. SHOW INDEXES FROM hudi_indexed_table;
  29. -- Query on ts column would prune the data using the idx_column_ts index
  30. SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24';
  31. -- Create secondary index on rider column
  32. CREATE INDEX record_index ON hudi_indexed_table (uuid);
  33. CREATE INDEX idx_rider ON hudi_indexed_table (rider);
  34. SET hoodie.metadata.record.index.enable=true;
  35. -- Expression index and secondary index should show up
  36. SHOW INDEXES FROM hudi_indexed_table;
  37. -- Query on rider column would leverage the secondary index idx_rider
  38. SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E';

Create Expression Index

A expression index is an index on a function of a column. It is a new addition to Hudi’s multi-modal indexing subsystem. Expression indexes can be used to implement logical partitioning of a table, by creating column_stats indexes on an expression of a column. For e.g. an expression index extracting a date from a timestamp field, can effectively implement date based partitioning, provide same benefits to queries, even if the physical layout is different.

  1. -- Create an expression index on the column `ts` (unix epoch) of the table `hudi_table` using the function `from_unixtime` with the format `yyyy-MM-dd`
  2. CREATE INDEX IF NOT EXISTS ts_datestr ON hudi_table
  3. USING column_stats(ts)
  4. OPTIONS(expr='from_unixtime', format='yyyy-MM-dd');
  5. -- Create an expression index on the column `ts` (timestamp in yyyy-MM-dd HH:mm:ss) of the table `hudi_table` using the function `hour`
  6. CREATE INDEX ts_hour ON hudi_table
  7. USING column_stats(ts)
  8. options(expr='hour');

SQL DDL - 图6note

  1. Expression index can only be created for Spark engine using SQL. It is not supported yet with Spark DataSource API.
  2. Expression index is not yet supported for complex types.
  3. Expression index is supported for unary and certain binary expressions. Please check SQL DDL docs for more details.

The expr option is required for creating expression index, and it should be a valid Spark SQL function. Please check the syntax for the above functions in the Spark SQL documentation and provide the options accordingly. For example, the format option is required for from_unixtime function.

Some useful functions that are supported are listed below.

  • identity
  • from_unixtime
  • date_format
  • to_date
  • to_timestamp
  • year
  • month
  • day
  • hour
  • lower
  • upper
  • substring
  • regexp_extract
  • regexp_replace
  • concat
  • length

Note that, only functions that take a single column as input are supported currently and UDFs are not supported.

Full example of creating and using expression index

  1. CREATE TABLE hudi_table_expr_index (
  2. ts STRING,
  3. uuid STRING,
  4. rider STRING,
  5. driver STRING,
  6. fare DOUBLE,
  7. city STRING
  8. ) USING HUDI
  9. tblproperties (primaryKey = 'uuid')
  10. PARTITIONED BY (city)
  11. location 'file:///tmp/hudi_table_expr_index';
  12. -- Query with hour function filter but no index yet --
  13. spark-sql> SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
  14. san_francisco 93.5 rider-E driver-O
  15. san_francisco 33.9 rider-D driver-L
  16. sao_paulo 43.4 rider-G driver-Q
  17. Time taken: 0.208 seconds, Fetched 3 row(s)
  18. spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
  19. == Optimized Logical Plan ==
  20. Project [city#3465, fare#3464, rider#3462, driver#3463], Statistics(sizeInBytes=899.5 KiB)
  21. +- Filter ((isnotnull(city#3465) AND isnotnull(ts#3460)) AND (NOT (city#3465 = chennai) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))), Statistics(sizeInBytes=2.5 MiB)
  22. +- Relation default.hudi_table_expr_index[_hoodie_commit_time#3455,_hoodie_commit_seqno#3456,_hoodie_record_key#3457,_hoodie_partition_path#3458,_hoodie_file_name#3459,ts#3460,uuid#3461,rider#3462,driver#3463,fare#3464,city#3465] parquet, Statistics(sizeInBytes=2.5 MiB)
  23. == Physical Plan ==
  24. *(1) Project [city#3465, fare#3464, rider#3462, driver#3463]
  25. +- *(1) Filter (isnotnull(ts#3460) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))
  26. +- *(1) ColumnarToRow
  27. +- FileScan parquet default.hudi_table_expr_index[ts#3460,rider#3462,driver#3463,fare#3464,city#3465] Batched: true, DataFilters: [isnotnull(ts#3460), (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/tmp/hudi_table_expr_index], PartitionFilters: [isnotnull(city#3465), NOT (city#3465 = chennai)], PushedFilters: [IsNotNull(ts)], ReadSchema: struct<ts:string,rider:string,driver:string,fare:double>
  28. -- create the expression index --
  29. CREATE INDEX ts_hour ON hudi_table_expr_index USING column_stats(ts) options(expr='hour');
  30. -- query after creating the index --
  31. spark-sql> SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
  32. san_francisco 93.5 rider-E driver-O
  33. san_francisco 33.9 rider-D driver-L
  34. sao_paulo 43.4 rider-G driver-Q
  35. Time taken: 0.202 seconds, Fetched 3 row(s)
  36. spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
  37. == Optimized Logical Plan ==
  38. Project [city#2970, fare#2969, rider#2967, driver#2968], Statistics(sizeInBytes=449.8 KiB)
  39. +- Filter ((isnotnull(city#2970) AND isnotnull(ts#2965)) AND (NOT (city#2970 = chennai) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))), Statistics(sizeInBytes=1278.3 KiB)
  40. +- Relation default.hudi_table_expr_index[_hoodie_commit_time#2960,_hoodie_commit_seqno#2961,_hoodie_record_key#2962,_hoodie_partition_path#2963,_hoodie_file_name#2964,ts#2965,uuid#2966,rider#2967,driver#2968,fare#2969,city#2970] parquet, Statistics(sizeInBytes=1278.3 KiB)
  41. == Physical Plan ==
  42. *(1) Project [city#2970, fare#2969, rider#2967, driver#2968]
  43. +- *(1) Filter (isnotnull(ts#2965) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))
  44. +- *(1) ColumnarToRow
  45. +- FileScan parquet default.hudi_table_expr_index[ts#2965,rider#2967,driver#2968,fare#2969,city#2970] Batched: true, DataFilters: [isnotnull(ts#2965), (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/tmp/hudi_table_expr_index], PartitionFilters: [isnotnull(city#2970), NOT (city#2970 = chennai)], PushedFilters: [IsNotNull(ts)], ReadSchema: struct<ts:string,rider:string,driver:string,fare:double>

Create Partition Stats Index

Partition stats index is similar to column stats, in the sense that it tracks - min, max, null, count, .. statistics on columns in the table, useful in query planning. The key difference being, while column_stats tracks statistics about files, the partition_stats index tracks aggregated statistics at the storage partition path level, to help more efficiently skip entire folder paths during query planning and execution.

To enable partition stats index, simply set hoodie.metadata.index.partition.stats.enable = 'true' in create table options.

SQL DDL - 图7note

  1. column_stats index is required to be enabled for partition_stats index. Both go hand in hand.
  2. partition_stats index is not created automatically for all columns. Users must specify list of columns for which they want to create partition stats index.
  3. column_stats and partition_stats index is not yet supported for complex types.

Create Secondary Index

Secondary indexes are record level indexes built on any column in the table. It supports multiple records having the same secondary column value efficiently and is built on top of the existing record level index built on the table’s record key. Secondary indexes are hash based indexes that offer horizontally scalable write performance by splitting key space into shards by hashing, as well as fast lookups by employing row-based file formats.

Let us now look at an example of creating a table with multiple indexes and how the query leverage the indexes for both partition pruning and data skipping.

  1. DROP TABLE IF EXISTS hudi_table;
  2. -- Let us create a table with multiple partition fields, and enable record index and partition stats index
  3. CREATE TABLE hudi_table (
  4. ts BIGINT,
  5. id STRING,
  6. rider STRING,
  7. driver STRING,
  8. fare DOUBLE,
  9. city STRING,
  10. state STRING
  11. ) USING hudi
  12. OPTIONS(
  13. primaryKey ='id',
  14. hoodie.metadata.record.index.enable = 'true', -- enable record index
  15. hoodie.metadata.index.partition.stats.enable = 'true', -- enable partition stats index
  16. hoodie.metadata.index.column.stats.enable = 'true', -- enable column stats
  17. hoodie.metadata.index.column.stats.column.list = 'rider', -- create column stats index on rider column
  18. hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING" -- enable commit time ordering, required for secondary index
  19. )
  20. PARTITIONED BY (city, state)
  21. LOCATION 'file:///tmp/hudi_test_table';
  22. INSERT INTO hudi_table VALUES (1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california');
  23. INSERT INTO hudi_table VALUES (1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california');
  24. INSERT INTO hudi_table VALUES (1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas');
  25. INSERT INTO hudi_table VALUES (1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas');
  26. -- simple partition predicate --
  27. select * from hudi_table where city = 'sunnyvale';
  28. 20240710215107477 20240710215107477_0_0 trip2 city=sunnyvale/state=california 1dcb14a9-bc4a-4eac-aab5-015f2254b7ec-0_0-40-75_20240710215107477.parquet 1695091554 trip2 rider-C driver-M 27.7 sunnyvale california
  29. Time taken: 0.58 seconds, Fetched 1 row(s)
  30. -- simple partition predicate on other partition field --
  31. select * from hudi_table where state = 'texas';
  32. 20240710215119846 20240710215119846_0_0 trip4 city=houston/state=texas 08c6ed2c-a87b-4798-8f70-6d8b16cb1932-0_0-74-133_20240710215119846.parquet 1695516137 trip4 rider-F driver-P 34.15 houston texas
  33. 20240710215110584 20240710215110584_0_0 trip3 city=austin/state=texas 0ab2243c-cc08-4da3-8302-4ce0b4c47a08-0_0-57-104_20240710215110584.parquet 1695332066 trip3 rider-E driver-O 93.5 austin texas
  34. Time taken: 0.124 seconds, Fetched 2 row(s)
  35. -- predicate on a column for which partition stats are present --
  36. select id, rider, city, state from hudi_table where rider > 'rider-D';
  37. trip4 rider-F houston texas
  38. trip3 rider-E austin texas
  39. Time taken: 0.703 seconds, Fetched 2 row(s)
  40. -- record key predicate --
  41. SELECT id, rider, driver FROM hudi_table WHERE id = 'trip1';
  42. trip1 rider-A driver-K
  43. Time taken: 0.368 seconds, Fetched 1 row(s)
  44. -- create secondary index on driver --
  45. CREATE INDEX driver_idx ON hudi_table (driver);
  46. -- secondary key predicate --
  47. SELECT id, driver, city, state FROM hudi_table WHERE driver IN ('driver-K', 'driver-M');
  48. trip1 driver-K san_francisco california
  49. trip2 driver-M sunnyvale california
  50. Time taken: 0.83 seconds, Fetched 2 row(s)

Create Bloom Filter Index

Bloom filter indexes store a bloom filter per file, on the column or column expression being index. It can be very effective in skipping files that don’t contain a high cardinality column value e.g. uuids.

  1. -- Create a bloom filter index on the column derived from expression `lower(rider)` of the table `hudi_table`
  2. CREATE INDEX idx_bloom_rider ON hudi_indexed_table USING bloom_filters(rider) OPTIONS(expr='lower');

Setting Hudi configs

There are different ways you can pass the configs for a given hudi table.

Using set command

You can use the set command to set any of Hudi’s write configs. This will apply to operations across the whole spark session.

  1. set hoodie.insert.shuffle.parallelism = 100;
  2. set hoodie.upsert.shuffle.parallelism = 100;
  3. set hoodie.delete.shuffle.parallelism = 100;

Using table properties

You can also configure table options when creating a table. This will be applied only for the table and override any SET command values.

  1. CREATE TABLE IF NOT EXISTS tableName (
  2. colName1 colType1,
  3. colName2 colType2,
  4. ...
  5. ) USING hudi
  6. TBLPROPERTIES (
  7. primaryKey = '${colName1}',
  8. type = 'cow',
  9. ${hoodie.config.key1} = '${hoodie.config.value1}',
  10. ${hoodie.config.key2} = '${hoodie.config.value2}',
  11. ....
  12. );
  13. e.g.
  14. CREATE TABLE IF NOT EXISTS hudi_table (
  15. id BIGINT,
  16. name STRING,
  17. price DOUBLE
  18. ) USING hudi
  19. TBLPROPERTIES (
  20. primaryKey = 'id',
  21. type = 'cow',
  22. hoodie.cleaner.fileversions.retained = '20',
  23. hoodie.keep.max.commits = '20'
  24. );

Table Properties

Users can set table properties while creating a table. The important table properties are discussed below.

Parameter NameDefaultDescription
typecowThe table type to create. type = ‘cow’ creates a COPY-ON-WRITE table, while type = ‘mor’ creates a MERGE-ON-READ table. Same as hoodie.datasource.write.table.type. More details can be found here
primaryKeyuuidThe primary key field names of the table separated by commas. Same as hoodie.datasource.write.recordkey.field. If this config is ignored, hudi will auto-generate primary keys. If explicitly set, primary key generation will honor user configuration.
preCombineFieldThe pre-combine field of the table. It is used for resolving the final version of the record among multiple versions. Generally, event time or another similar column will be used for ordering purposes. Hudi will be able to handle out-of-order data using the preCombine field value.

SQL DDL - 图8note

primaryKey, preCombineField, and type and other properties are case-sensitive.

Passing Lock Providers for Concurrent Writers

Hudi requires a lock provider to support concurrent writers or asynchronous table services when using OCC and NBCC (Non-Blocking Concurrency Control) concurrency mode. For NBCC mode, locking is only used to write the commit metadata file in the timeline. Writes are serialized by completion time. Users can pass these table properties into TBLPROPERTIES as well. Below is an example for a Zookeeper based configuration.

  1. -- Properties to use Lock configurations to support Multi Writers
  2. TBLPROPERTIES(
  3. hoodie.write.lock.zookeeper.url = "zookeeper",
  4. hoodie.write.lock.zookeeper.port = "2181",
  5. hoodie.write.lock.zookeeper.lock_key = "tableName",
  6. hoodie.write.lock.provider = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
  7. hoodie.write.concurrency.mode = "optimistic_concurrency_control",
  8. hoodie.write.lock.zookeeper.base_path = "/tableName"
  9. )

Enabling Column Stats / Record Level Index for the table

Hudi provides the ability to leverage rich metadata and index about the table, speed up DMLs and queries. For e.g: collection of column statistics can be enabled to perform quick data skipping or a record-level index can be used to perform fast updates or point lookups using the following table properties.

For more, see Metadata Configurations

  1. TBLPROPERTIES(
  2. 'hoodie.metadata.index.column.stats.enable' = 'true'
  3. 'hoodie.metadata.record.index.enable' = 'true'
  4. )

Spark Alter Table

Syntax

  1. -- Alter table name
  2. ALTER TABLE oldTableName RENAME TO newTableName;
  3. -- Alter table add columns
  4. ALTER TABLE tableIdentifier ADD COLUMNS(colAndType [, colAndType]);

Examples

  1. --rename to:
  2. ALTER TABLE hudi_table RENAME TO hudi_table_renamed;
  3. --add column:
  4. ALTER TABLE hudi_table ADD COLUMNS(remark STRING);

Modifying Table Properties

Syntax

  1. -- alter table ... set|unset
  2. ALTER TABLE tableIdentifier SET|UNSET TBLPROPERTIES (table_property = 'property_value');

Examples

  1. ALTER TABLE hudi_table SET TBLPROPERTIES (hoodie.keep.max.commits = '10');
  2. ALTER TABLE hudi_table SET TBLPROPERTIES ("note" = "don't drop this table");
  3. ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS (hoodie.keep.max.commits);
  4. ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS ('note');

SQL DDL - 图9note

Currently, trying to change the column type may throw an error ALTER TABLE CHANGE COLUMN is not supported for changing column colName with oldColType to colName with newColType., due to an open SPARK issue

Alter config options

You can also alter the write config for a table by the ALTER TABLE SET SERDEPROPERTIES

Syntax

  1. -- alter table ... set|unset
  2. ALTER TABLE tableName SET SERDEPROPERTIES ('property' = 'property_value');

Example

  1. ALTER TABLE hudi_table SET SERDEPROPERTIES ('key1' = 'value1');

Show and drop partitions

Syntax

  1. -- Show partitions
  2. SHOW PARTITIONS tableIdentifier;
  3. -- Drop partition
  4. ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] );

Examples

  1. --Show partition:
  2. SHOW PARTITIONS hudi_table;
  3. --Drop partition
  4. ALTER TABLE hudi_table DROP PARTITION (dt='2021-12-09', hh='10');

Show and drop index

Syntax

  1. -- Show Indexes
  2. SHOW INDEXES FROM tableIdentifier;
  3. -- Drop partition
  4. DROP INDEX indexIdentifier ON tableIdentifier;

Examples

  1. -- Show indexes
  2. SHOW INDEXES FROM hudi_indexed_table;
  3. -- Drop Index
  4. DROP INDEX record_index ON hudi_indexed_table;

Show create table

Syntax

  1. SHOW CREATE TABLE tableIdentifier;

Examples

  1. SHOW CREATE TABLE hudi_table;

Caveats

Hudi currently has the following limitations when using Spark SQL, to create/alter tables.

  • ALTER TABLE ... RENAME TO ... is not supported when using AWS Glue Data Catalog as hive metastore as Glue itself does not support table renames.
  • A new Hudi table created by Spark SQL will by default set hoodie.datasource.write.hive_style_partitioning=true, for ease of use. This can be overridden using table properties.

Create Catalog

The catalog helps to manage the SQL tables, the table can be shared among sessions if the catalog persists the table definitions. For hms mode, the catalog also supplements the hive syncing options.

Example

  1. CREATE CATALOG hoodie_catalog
  2. WITH (
  3. 'type'='hudi',
  4. 'catalog.path' = '${catalog default root path}',
  5. 'hive.conf.dir' = '${directory where hive-site.xml is located}',
  6. 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
  7. );

Options

Option NameRequiredDefaultRemarks
catalog.pathtrueDefault path for the catalog’s table storage, the path is used to infer the table path automatically, the default table path: ${catalog.path}/${db_name}/${table_name}
default-databasefalsedefaultdefault database name
hive.conf.dirfalseThe directory where hive-site.xml is located, only valid in hms mode
modefalsedfsSupports hms mode that uses HMS to persist the table options
table.externalfalsefalseWhether to create the external table, only valid in hms mode

Create Table

You can create tables using standard FLINK SQL CREATE TABLE syntax, which supports partitioning and passing Flink options using WITH.

  1. CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  2. (
  3. { <physical_column_definition>
  4. [ <table_constraint> ][ , ...n]
  5. )
  6. [COMMENT table_comment]
  7. [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  8. WITH (key1=val1, key2=val2, ...)

Create non-partitioned table

Creating a non-partitioned table is as simple as creating a regular table.

  1. -- create a Hudi table
  2. CREATE TABLE hudi_table(
  3. id BIGINT,
  4. name STRING,
  5. price DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'hudi',
  9. 'path' = 'file:///tmp/hudi_table',
  10. 'table.type' = 'MERGE_ON_READ'
  11. );

Create partitioned table

The following is an example of creating a Flink partitioned table.

  1. CREATE TABLE hudi_table(
  2. id BIGINT,
  3. name STRING,
  4. dt STRING,
  5. hh STRING
  6. )
  7. PARTITIONED BY (`dt`)
  8. WITH (
  9. 'connector' = 'hudi',
  10. 'path' = 'file:///tmp/hudi_table',
  11. 'table.type' = 'MERGE_ON_READ'
  12. );

Create table with record keys and ordering fields

The following is an example of creating a Flink table with record key and ordering field similarly to spark.

  1. CREATE TABLE hudi_table(
  2. id BIGINT PRIMARY KEY NOT ENFORCED,
  3. name STRING,
  4. price DOUBLE,
  5. ts BIGINT
  6. )
  7. PARTITIONED BY (`dt`)
  8. WITH (
  9. 'connector' = 'hudi',
  10. 'path' = 'file:///tmp/hudi_table',
  11. 'table.type' = 'MERGE_ON_READ',
  12. 'precombine.field' = 'ts'
  13. );

Create Table in Non-Blocking Concurrency Control Mode

The following is an example of creating a Flink table in Non-Blocking Concurrency Control mode.

  1. -- This is a datagen source that can generate records continuously
  2. CREATE TABLE sourceT (
  3. uuid VARCHAR(20),
  4. name VARCHAR(10),
  5. age INT,
  6. ts TIMESTAMP(3),
  7. `partition` AS 'par1'
  8. ) WITH (
  9. 'connector' = 'datagen',
  10. 'rows-per-second' = '200'
  11. );
  12. -- pipeline1: by default, enable the compaction and cleaning services
  13. CREATE TABLE t1 (
  14. uuid VARCHAR(20),
  15. name VARCHAR(10),
  16. age INT,
  17. ts TIMESTAMP(3),
  18. `partition` VARCHAR(20)
  19. ) WITH (
  20. 'connector' = 'hudi',
  21. 'path' = '/tmp/hudi-demo/t1',
  22. 'table.type' = 'MERGE_ON_READ',
  23. 'index.type' = 'BUCKET',
  24. 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
  25. 'write.tasks' = '2'
  26. );
  27. -- pipeline2: disable the compaction and cleaning services manually
  28. CREATE TABLE t1_2 (
  29. uuid VARCHAR(20),
  30. name VARCHAR(10),
  31. age INT,
  32. ts TIMESTAMP(3),
  33. `partition` VARCHAR(20)
  34. ) WITH (
  35. 'connector' = 'hudi',
  36. 'path' = '/tmp/hudi-demo/t1',
  37. 'table.type' = 'MERGE_ON_READ',
  38. 'index.type' = 'BUCKET',
  39. 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
  40. 'write.tasks' = '2',
  41. 'compaction.schedule.enabled' = 'false',
  42. 'compaction.async.enabled' = 'false',
  43. 'clean.async.enabled' = 'false'
  44. );
  45. -- Submit the pipelines
  46. INSERT INTO t1
  47. SELECT * FROM sourceT;
  48. INSERT INTO t1_2
  49. SELECT * FROM sourceT;
  50. SELECT * FROM t1 LIMIT 20;

Alter Table

  1. ALTER TABLE tableA RENAME TO tableB;

Setting Hudi configs

Using table options

You can configure hoodie configs in table options when creating a table. You can refer Flink specific hoodie configs here These configs will be applied to all the operations on that table.

  1. CREATE TABLE IF NOT EXISTS tableName (
  2. colName1 colType1 PRIMARY KEY NOT ENFORCED,
  3. colName2 colType2,
  4. ...
  5. )
  6. WITH (
  7. 'connector' = 'hudi',
  8. 'path' = '${path}',
  9. ${hoodie.config.key1} = '${hoodie.config.value1}',
  10. ${hoodie.config.key2} = '${hoodie.config.value2}',
  11. ....
  12. );
  13. e.g.
  14. CREATE TABLE hudi_table(
  15. id BIGINT PRIMARY KEY NOT ENFORCED,
  16. name STRING,
  17. price DOUBLE,
  18. ts BIGINT
  19. )
  20. PARTITIONED BY (`dt`)
  21. WITH (
  22. 'connector' = 'hudi',
  23. 'path' = 'file:///tmp/hudi_table',
  24. 'table.type' = 'MERGE_ON_READ',
  25. 'precombine.field' = 'ts',
  26. 'hoodie.cleaner.fileversions.retained' = '20',
  27. 'hoodie.keep.max.commits' = '20',
  28. 'hoodie.datasource.write.hive_style_partitioning' = 'true'
  29. );

Supported Types

SparkHudiNotes
booleanboolean
byteint
shortint
integerint
longlong
datedate
timestamptimestamp
floatfloat
doubledouble
stringstring
decimaldecimal
binarybytes
arrayarray
mapmap
structstruct
charnot supported
varcharnot supported
numericnot supported
nullnot supported
objectnot supported