Spark Quick Start

This guide provides a quick peek at Hudi’s capabilities using Spark. Using Spark Datasource APIs(both scala and python) and using Spark SQL, we will walk through code snippets that allows you to insert, update, delete and query a Hudi table.

Setup

Hudi works with Spark 3.3 and above versions. You can follow instructions here for setting up Spark.

Spark 3 Support Matrix

HudiSupported Spark 3 version
1.0.x3.5.x (default build), 3.4.x, 3.3.x
0.15.x3.5.x (default build), 3.4.x, 3.3.x, 3.2.x, 3.1.x, 3.0.x
0.14.x3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x
0.13.x3.3.x (default build), 3.2.x, 3.1.x
0.12.x3.3.x (default build), 3.2.x, 3.1.x
0.11.x3.2.x (default build, Spark bundle only), 3.1.x
0.10.x3.1.x (default build), 3.0.x
0.7.0 - 0.9.03.0.x
0.6.0 and priornot supported

The default build Spark version indicates how we build hudi-spark3-bundle.

Spark Quick Start - 图1Change summary

In 1.0.0, we dropped the support for Spark 3.2.x and lower Spark 3 versions. In 0.15.0, we introduced the support for Spark 3.5.x. In 0.14.0, we introduced the support for Spark 3.4.x and bring back the support for Spark 3.0.x. In 0.12.0, we introduced the experimental support for Spark 3.3.0. In 0.11.0, there are changes on using Spark bundles, please refer to 0.11.0 release notes for detailed instructions.

Spark Shell/SQL

  • Scala
  • Python
  • Spark SQL

From the extracted directory run spark-shell with Hudi:

  1. # For Spark versions: 3.3 - 3.5
  2. export SPARK_VERSION=3.5 # or 3.4, 3.3
  3. spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.0 \
  4. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  5. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  6. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  7. --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

From the extracted directory run pyspark with Hudi:

  1. # For Spark versions: 3.3 - 3.5
  2. export PYSPARK_PYTHON=$(which python3)
  3. export SPARK_VERSION=3.5 # or 3.4, 3.3
  4. pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

Hudi support using Spark SQL to write and read data with the HoodieSparkSessionExtension sql extension. From the extracted directory run Spark SQL with Hudi:

  1. # For Spark versions: 3.3 - 3.5
  2. export SPARK_VERSION=3.5 # or 3.4, 3.3
  3. spark-sql --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.0 \
  4. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  5. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  6. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  7. --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

Spark Quick Start - 图2on Kryo serialization

Users are recommended to set this config to reduce Kryo serialization overhead

  1. --conf 'spark.kryo.registrator=org.apache.spark.HoodieKryoRegistrar'

Setup project

Below, we do imports and setup the table name and corresponding base path.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. import scala.collection.JavaConversions._
  3. import org.apache.spark.sql.SaveMode._
  4. import org.apache.hudi.DataSourceReadOptions._
  5. import org.apache.hudi.DataSourceWriteOptions._
  6. import org.apache.hudi.common.table.HoodieTableConfig._
  7. import org.apache.hudi.config.HoodieWriteConfig._
  8. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  9. import org.apache.hudi.common.model.HoodieRecord
  10. import spark.implicits._
  11. val tableName = "trips_table"
  12. val basePath = "file:///tmp/trips_table"
  1. # pyspark
  2. from pyspark.sql.functions import lit, col
  3. tableName = "trips_table"
  4. basePath = "file:///tmp/trips_table"
  1. // Next section will go over create table commands

Create Table

First, let’s create a Hudi table. Here, we use a partitioned table for illustration, but Hudi also supports non-partitioned tables.

  • Scala
  • Python
  • Spark SQL
  1. // scala
  2. // First commit will auto-initialize the table, if it did not exist in the specified base path.
  1. # pyspark
  2. # First commit will auto-initialize the table, if it did not exist in the specified base path.

Spark Quick Start - 图3NOTE:

For users who have Spark-Hive integration in their environment, this guide assumes that you have the appropriate settings configured to allow Spark to create tables and register in Hive Metastore.

Here is an example of creating a Hudi table.

  1. -- create a Hudi table that is partitioned.
  2. CREATE TABLE hudi_table (
  3. ts BIGINT,
  4. uuid STRING,
  5. rider STRING,
  6. driver STRING,
  7. fare DOUBLE,
  8. city STRING
  9. ) USING HUDI
  10. PARTITIONED BY (city);

For more options for creating Hudi tables or if you’re running into any issues, please refer to SQL DDL reference guide.

Insert data

  • Scala
  • Python
  • Spark SQL

Generate some new records as a DataFrame and write the DataFrame into a Hudi table. Since, this is the first write, it will also auto-create the table.

  1. // spark-shell
  2. val columns = Seq("ts","uuid","rider","driver","fare","city")
  3. val data =
  4. Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  5. (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  6. (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  7. (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
  8. (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  9. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  10. inserts.write.format("hudi").
  11. option("hoodie.datasource.write.partitionpath.field", "city").
  12. option("hoodie.table.name", tableName).
  13. mode(Overwrite).
  14. save(basePath)

Spark Quick Start - 图4Mapping to Hudi write operations

Hudi provides a wide range of write operations - both batch and incremental - to write data into Hudi tables, with different semantics and performance. When record keys are not configured (see keys below), bulk_insert will be chosen as the write operation, matching the out-of-behavior of Spark’s Parquet Datasource.

Generate some new records as a DataFrame and write the DataFrame into a Hudi table. Since, this is the first write, it will also auto-create the table.

  1. # pyspark
  2. columns = ["ts","uuid","rider","driver","fare","city"]
  3. data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  4. (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  5. (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  6. (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
  7. (1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
  8. inserts = spark.createDataFrame(data).toDF(*columns)
  9. hudi_options = {
  10. 'hoodie.table.name': tableName,
  11. 'hoodie.datasource.write.partitionpath.field': 'city'
  12. }
  13. inserts.write.format("hudi"). \
  14. options(**hudi_options). \
  15. mode("overwrite"). \
  16. save(basePath)

Spark Quick Start - 图5Mapping to Hudi write operations

Hudi provides a wide range of write operations - both batch and incremental - to write data into Hudi tables, with different semantics and performance. When record keys are not configured (see keys below), bulk_insert will be chosen as the write operation, matching the out-of-behavior of Spark’s Parquet Datasource.

Users can use ‘INSERT INTO’ to insert data into a Hudi table. See Insert Into for more advanced options.

  1. INSERT INTO hudi_table
  2. VALUES
  3. (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  4. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  5. (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  6. (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  7. (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo' ),
  8. (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo' ),
  9. (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai' ),
  10. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

If you want to control the Hudi write operation used for the INSERT statement, you can set the following config before issuing the INSERT statement:

  1. -- bulk_insert using INSERT_INTO
  2. SET hoodie.spark.sql.insert.into.operation = 'bulk_insert'

Query data

Hudi tables can be queried back into a DataFrame or Spark SQL.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. val tripsDF = spark.read.format("hudi").load(basePath)
  3. tripsDF.createOrReplaceTempView("trips_table")
  4. spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
  5. spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()
  1. # pyspark
  2. tripsDF = spark.read.format("hudi").load(basePath)
  3. tripsDF.createOrReplaceTempView("trips_table")
  4. spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
  5. spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()
  1. SELECT ts, fare, rider, driver, city FROM hudi_table WHERE fare > 20.0;

Update data

Hudi tables can be updated by streaming in a DataFrame or using a standard UPDATE statement.

  • Scala
  • Python
  • Spark SQL
  1. // Lets read data from target Hudi table, modify fare column for rider-D and update it.
  2. val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
  3. updatesDf.write.format("hudi").
  4. option("hoodie.datasource.write.operation", "upsert").
  5. option("hoodie.datasource.write.partitionpath.field", "city").
  6. option("hoodie.table.name", tableName).
  7. mode(Append).
  8. save(basePath)

Spark Quick Start - 图6Key requirements

Updates with spark-datasource is feasible only when the source dataframe contains Hudi’s meta fields or a key field is configured. Notice that the save mode is now Append. In general, always use append mode unless you are trying to create the table for the first time.

Hudi table can be update using a regular UPDATE statement. See Update for more advanced options.

  1. UPDATE hudi_table SET fare = 25.0 WHERE rider = 'rider-D';
  1. # pyspark
  2. # Lets read data from target Hudi table, modify fare column for rider-D and update it.
  3. updatesDf = spark.read.format("hudi").load(basePath).filter("rider == 'rider-D'").withColumn("fare",col("fare")*10)
  4. updatesDf.write.format("hudi"). \
  5. options(**hudi_options). \
  6. mode("append"). \
  7. save(basePath)

Spark Quick Start - 图7Key requirements

Updates with spark-datasource is feasible only when the source dataframe contains Hudi’s meta fields or a key field is configured. Notice that the save mode is now Append. In general, always use append mode unless you are trying to create the table for the first time.

Querying the data again will now show updated records. Each write operation generates a new commit. Look for changes in _hoodie_commit_time, fare fields for the given _hoodie_record_key value from a previous commit.

Merging Data

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. val adjustedFareDF = spark.read.format("hudi").
  3. load(basePath).limit(2).
  4. withColumn("fare", col("fare") * 10)
  5. adjustedFareDF.write.format("hudi").
  6. option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
  7. mode(Append).
  8. save(basePath)
  9. // Notice Fare column has been updated but all other columns remain intact.
  10. spark.read.format("hudi").load(basePath).show()

The com.payloads.CustomMergeIntoConnector adds adjusted fare values to the original table and preserves all other fields. Refer here for sample implementation of com.payloads.CustomMergeIntoConnector.

  1. # pyspark
  2. adjustedFareDF = spark.read.format("hudi").load(basePath). \
  3. limit(2).withColumn("fare", col("fare") * 100)
  4. adjustedFareDF.write.format("hudi"). \
  5. option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector"). \
  6. mode("append"). \
  7. save(basePath)
  8. # Notice Fare column has been updated but all other columns remain intact.
  9. spark.read.format("hudi").load(basePath).show()

The com.payloads.CustomMergeIntoConnector adds adjusted fare values to the original table and preserves all other fields. Refer here for sample implementation of com.payloads.CustomMergeIntoConnector.

  1. -- source table using Hudi for testing merging into target Hudi table
  2. CREATE TABLE fare_adjustment (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING)
  3. USING HUDI;
  4. INSERT INTO fare_adjustment VALUES
  5. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),
  6. (1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),
  7. (1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo' ),
  8. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai' );
  9. MERGE INTO hudi_table AS target
  10. USING fare_adjustment AS source
  11. ON target.uuid = source.uuid
  12. WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare
  13. WHEN NOT MATCHED THEN INSERT *
  14. ;

Spark Quick Start - 图8Key requirements

  1. For a Hudi table with user defined primary record keys, the join condition is expected to contain the primary keys of the table. For a Hudi table with Hudi generated primary keys, the join condition can be on any arbitrary data columns.

Merging Data (Partial Updates)

Partial updates only write updated columns instead of full update record. This is useful when you have hundreds of columns and only a few columns are updated. It reduces the write costs as well as storage costs. MERGE INTO statement above can be modified to use partial updates as shown below.

  1. MERGE INTO hudi_table AS target
  2. USING fare_adjustment AS source
  3. ON target.uuid = source.uuid
  4. WHEN MATCHED THEN UPDATE SET fare = source.fare
  5. WHEN NOT MATCHED THEN INSERT *
  6. ;

Notice, instead of UPDATE SET *, we are updating only the fare column.

Delete data

Delete operation removes the records specified from the table. For example, this code snippet deletes records for the HoodieKeys passed in. Check out the deletion section for more details.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. // Lets delete rider: rider-D
  3. val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")
  4. deletesDF.write.format("hudi").
  5. option("hoodie.datasource.write.operation", "delete").
  6. option("hoodie.datasource.write.partitionpath.field", "city").
  7. option("hoodie.table.name", tableName).
  8. mode(Append).
  9. save(basePath)

Querying the data again will not show the deleted record.

Spark Quick Start - 图9Key requirements

Deletes with spark-datasource is supported only when the source dataframe contains Hudi’s meta fields or a key field is configured. Notice that the save mode is again Append.

  1. DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';
  1. # pyspark
  2. # Lets delete rider: rider-D
  3. deletesDF = spark.read.format("hudi").load(basePath).filter("rider == 'rider-F'")
  4. # issue deletes
  5. hudi_hard_delete_options = {
  6. 'hoodie.table.name': tableName,
  7. 'hoodie.datasource.write.partitionpath.field': 'city',
  8. 'hoodie.datasource.write.operation': 'delete',
  9. }
  10. deletesDF.write.format("hudi"). \
  11. options(**hudi_hard_delete_options). \
  12. mode("append"). \
  13. save(basePath)

Querying the data again will not show the deleted record.

Spark Quick Start - 图10Key requirements

Deletes with spark-datasource is supported only when the source dataframe contains Hudi’s meta fields or a key field is configured. Notice that the save mode is again Append.

Index data

Hudi supports indexing on columns to speed up queries. Indexes can be created on columns using the CREATE INDEX statement.

Spark Quick Start - 图11note

Please note in order to create secondary index:

  1. The table must have a primary key and merge mode should be COMMIT_TIME_ORDERING.
  2. Record index must be enabled. This can be done by setting hoodie.metadata.record.index.enable=true and then creating record_index. Please note the example below.
  • Scala
  • Spark SQL

Here is an example which shows how to create indexes for a table created using Datasource API.

  1. import scala.collection.JavaConversions._
  2. import org.apache.spark.sql.SaveMode._
  3. import org.apache.hudi.DataSourceReadOptions._
  4. import org.apache.hudi.DataSourceWriteOptions._
  5. import org.apache.hudi.common.table.HoodieTableConfig._
  6. import org.apache.hudi.config.HoodieWriteConfig._
  7. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  8. import org.apache.hudi.common.model.HoodieRecord
  9. import spark.implicits._
  10. val tableName = "trips_table_index"
  11. val basePath = "file:///tmp/hudi_indexed_table"
  12. val columns = Seq("ts","uuid","rider","driver","fare","city")
  13. val data =
  14. Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  15. (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  16. (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  17. (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
  18. (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  19. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  20. inserts.write.format("hudi").
  21. option("hoodie.datasource.write.partitionpath.field", "city").
  22. option("hoodie.table.name", tableName).
  23. option("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING").
  24. option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  25. mode(Overwrite).
  26. save(basePath)
  27. // Create record index and secondary index for the table
  28. spark.sql(s"CREATE TABLE hudi_indexed_table USING hudi LOCATION '$basePath'")
  29. // Create bloom filter expression index on driver column
  30. spark.sql(s"CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity')");
  31. // It would show bloom filter expression index
  32. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  33. // Query on driver column would prune the data using the idx_bloom_driver index
  34. spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S'");
  35. // Create column stat expression index on ts column
  36. spark.sql(s"CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd')");
  37. // Shows both expression indexes
  38. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  39. // Query on ts column would prune the data using the idx_column_ts index
  40. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24'");
  41. // To create secondary index, first create the record index
  42. spark.sql(s"SET hoodie.metadata.record.index.enable=true");
  43. spark.sql(s"CREATE INDEX record_index ON hudi_indexed_table (uuid)");
  44. // Create secondary index on rider column
  45. spark.sql(s"CREATE INDEX idx_rider ON hudi_indexed_table (rider)");
  46. // Expression index and secondary index should show up
  47. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  48. // Query on rider column would leverage the secondary index idx_rider
  49. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E'");
  50. // Update a record and query the table based on indexed columns
  51. spark.sql(s"UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts = '1697516137' WHERE rider = 'rider-A'");
  52. // Data skipping would be performed using column stat expression index
  53. spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-10-17'");
  54. // Data skipping would be performed using bloom filter expression index
  55. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N'");
  56. // Data skipping would be performed using secondary index
  57. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B'");
  58. // Drop all the indexes
  59. spark.sql(s"DROP INDEX secondary_index_idx_rider on hudi_indexed_table");
  60. spark.sql(s"DROP INDEX record_index on hudi_indexed_table");
  61. spark.sql(s"DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table");
  62. spark.sql(s"DROP INDEX expr_index_idx_column_ts on hudi_indexed_table");
  63. // No indexes should show up for the table
  64. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  65. spark.sql(s"SET hoodie.metadata.record.index.enable=false");
  1. -- Create a table with primary key
  2. CREATE TABLE hudi_indexed_table (
  3. ts BIGINT,
  4. uuid STRING,
  5. rider STRING,
  6. driver STRING,
  7. fare DOUBLE,
  8. city STRING
  9. ) USING HUDI
  10. options(
  11. primaryKey ='uuid',
  12. hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
  13. )
  14. PARTITIONED BY (city);
  15. INSERT INTO hudi_indexed_table
  16. VALUES
  17. (1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  18. (1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  19. (1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  20. (1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  21. (1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo' ),
  22. (1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo' ),
  23. (1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai' ),
  24. (1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
  25. -- Create bloom filter expression index on driver column
  26. CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity');
  27. -- It would show bloom filter expression index
  28. SHOW INDEXES FROM hudi_indexed_table;
  29. -- Query on driver column would prune the data using the idx_bloom_driver index
  30. SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S';
  31. -- Create column stat expression index on ts column
  32. CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
  33. -- Shows both expression indexes
  34. SHOW INDEXES FROM hudi_indexed_table;
  35. -- Query on ts column would prune the data using the idx_column_ts index
  36. SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24';
  37. -- To create secondary index, first create the record index
  38. SET hoodie.metadata.record.index.enable=true;
  39. CREATE INDEX record_index ON hudi_indexed_table (uuid);
  40. -- Create secondary index on rider column
  41. CREATE INDEX idx_rider ON hudi_indexed_table (rider);
  42. -- Expression index and secondary index should show up
  43. SHOW INDEXES FROM hudi_indexed_table;
  44. -- Query on rider column would leverage the secondary index idx_rider
  45. SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E';
  46. -- Update a record and query the table based on indexed columns
  47. UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts = '1697516137' WHERE rider = 'rider-A';
  48. -- Data skipping would be performed using column stat expression index
  49. SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-10-17';
  50. -- Data skipping would be performed using bloom filter expression index
  51. SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N';
  52. -- Data skipping would be performed using secondary index
  53. SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B';
  54. -- Drop all the indexes
  55. DROP INDEX record_index on hudi_indexed_table;
  56. DROP INDEX secondary_index_idx_rider on hudi_indexed_table;
  57. DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table;
  58. DROP INDEX expr_index_idx_column_ts on hudi_indexed_table;
  59. -- No indexes should show up for the table
  60. SHOW INDEXES FROM hudi_indexed_table;
  61. SET hoodie.metadata.record.index.enable=false;

Time Travel Query

Hudi supports time travel query to query the table as of a point-in-time in history. Three timestamp formats are supported as illustrated below.

  • Scala
  • Python
  • Spark SQL
  1. spark.read.format("hudi").
  2. option("as.of.instant", "20210728141108100").
  3. load(basePath)
  4. spark.read.format("hudi").
  5. option("as.of.instant", "2021-07-28 14:11:08.200").
  6. load(basePath)
  7. // It is equal to "as.of.instant = 2021-07-28 00:00:00"
  8. spark.read.format("hudi").
  9. option("as.of.instant", "2021-07-28").
  10. load(basePath)
  1. # pyspark
  2. spark.read.format("hudi"). \
  3. option("as.of.instant", "20210728141108100"). \
  4. load(basePath)
  5. spark.read.format("hudi"). \
  6. option("as.of.instant", "2021-07-28 14:11:08.000"). \
  7. load(basePath)
  8. # It is equal to "as.of.instant = 2021-07-28 00:00:00"
  9. spark.read.format("hudi"). \
  10. option("as.of.instant", "2021-07-28"). \
  11. load(basePath)
  1. -- time travel based on commit time, for eg: `20220307091628793`
  2. SELECT * FROM hudi_table TIMESTAMP AS OF '20220307091628793' WHERE id = 1;
  3. -- time travel based on different timestamp formats
  4. SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-07 09:16:28.100' WHERE id = 1;
  5. SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-08' WHERE id = 1;

Incremental query

Hudi provides the unique capability to obtain a set of records that changed between a start and end commit time, providing you with the “latest state” for each such record as of the end commit time. By default, Hudi tables are configured to support incremental queries, using record level metadata tracking.

Below, we fetch changes since a given begin time while the end time defaults to the latest commit on the table. Users can also specify an end time using END_INSTANTTIME.key() option.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. spark.read.format("hudi").load(basePath).createOrReplaceTempView("trips_table")
  3. val commits = spark.sql("SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM trips_table ORDER BY commitTime").map(k => k.getString(0)).take(50)
  4. val beginTime = commits(commits.length - 2) // commit time we are interested in
  5. // incrementally query data
  6. val tripsIncrementalDF = spark.read.format("hudi").
  7. option("hoodie.datasource.query.type", "incremental").
  8. option("hoodie.datasource.read.begin.instanttime", 0).
  9. load(basePath)
  10. tripsIncrementalDF.createOrReplaceTempView("trips_incremental")
  11. spark.sql("SELECT `_hoodie_commit_time`, fare, rider, driver, uuid, ts FROM trips_incremental WHERE fare > 20.0").show()
  1. # pyspark
  2. # reload data
  3. spark.read.format("hudi").load(basePath).createOrReplaceTempView("trips_table")
  4. commits = list(map(lambda row: row[0], spark.sql("SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM trips_table ORDER BY commitTime").limit(50).collect()))
  5. beginTime = commits[len(commits) - 2] # commit time we are interested in
  6. # incrementally query data
  7. incremental_read_options = {
  8. 'hoodie.datasource.query.type': 'incremental',
  9. 'hoodie.datasource.read.begin.instanttime': beginTime,
  10. }
  11. tripsIncrementalDF = spark.read.format("hudi"). \
  12. options(**incremental_read_options). \
  13. load(basePath)
  14. tripsIncrementalDF.createOrReplaceTempView("trips_incremental")
  15. spark.sql("SELECT `_hoodie_commit_time`, fare, rider, driver, uuid, ts FROM trips_incremental WHERE fare > 20.0").show()
  1. -- syntax
  2. hudi_table_changes(table or path, queryType, beginTime [, endTime]);
  3. -- table or path: table identifier, example: db.tableName, tableName,
  4. -- or path for of your table, example: path/to/hudiTable
  5. -- in this case table does not need to exist in the metastore,
  6. -- queryType: incremental query mode, example: latest_state, cdc
  7. -- (for cdc query, first enable cdc for your table by setting cdc.enabled=true),
  8. -- beginTime: instantTime to begin query from, example: earliest, 202305150000,
  9. -- endTime: optional instantTime to end query at, example: 202305160000,
  10. -- incrementally query data by table name
  11. -- start from earliest available commit, end at latest available commit.
  12. SELECT * FROM hudi_table_changes('db.table', 'latest_state', 'earliest');
  13. -- start from earliest, end at 202305160000.
  14. SELECT * FROM hudi_table_changes('table', 'latest_state', 'earliest', '202305160000');
  15. -- start from 202305150000, end at 202305160000.
  16. SELECT * FROM hudi_table_changes('table', 'latest_state', '202305150000', '202305160000');

Change Data Capture Query

Hudi also exposes first-class support for Change Data Capture (CDC) queries. CDC queries are useful for applications that need to obtain all the changes, along with before/after images of records, given a commit time range.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. // Lets first insert data to a new table with cdc enabled.
  3. val columns = Seq("ts","uuid","rider","driver","fare","city")
  4. val data =
  5. Seq((1695158649187L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  6. (1695091544288L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_paulo"),
  7. (1695046452379L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
  8. (1695332056404L,"1dced545-862b-4ceb-8b43-d2a568f6616b","rider-D","driver-N",93.50,"chennai"));
  9. var df = spark.createDataFrame(data).toDF(columns:_*)
  10. // Insert data
  11. df.write.format("hudi").
  12. option("hoodie.datasource.write.partitionpath.field", "city").
  13. option("hoodie.table.cdc.enabled", "true").
  14. option("hoodie.table.name", tableName).
  15. mode(Overwrite).
  16. save(basePath)
  17. // Update fare for riders: rider-A and rider-B
  18. val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-A" || $"rider" === "rider-B").withColumn("fare", col("fare") * 10)
  19. updatesDf.write.format("hudi").
  20. option("hoodie.datasource.write.operation", "upsert").
  21. option("hoodie.datasource.write.partitionpath.field", "city").
  22. option("hoodie.table.cdc.enabled", "true").
  23. option("hoodie.table.name", tableName).
  24. mode(Append).
  25. save(basePath)
  26. // Query CDC data
  27. spark.read.option("hoodie.datasource.read.begin.instanttime", 0).
  28. option("hoodie.datasource.query.type", "incremental").
  29. option("hoodie.datasource.query.incremental.format", "cdc").
  30. format("hudi").load(basePath).show(false)
  1. # pyspark
  2. # Lets first insert data to a new table with cdc enabled.
  3. columns = ["ts","uuid","rider","driver","fare","city"]
  4. data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  5. (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_francisco"),
  6. (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
  7. (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-C","driver-N",34.15,"sao_paulo")]
  8. inserts = spark.createDataFrame(data).toDF(*columns)
  9. hudi_options = {
  10. 'hoodie.table.name': tableName,
  11. 'hoodie.datasource.write.partitionpath.field': 'city',
  12. 'hoodie.table.cdc.enabled': 'true'
  13. }
  14. # Insert data
  15. inserts.write.format("hudi"). \
  16. options(**hudi_options). \
  17. mode("overwrite"). \
  18. save(basePath)
  19. # Update fare for riders: rider-A and rider-B
  20. updatesDf = spark.read.format("hudi").load(basePath).filter("rider == 'rider-A' or rider == 'rider-B'").withColumn("fare",col("fare")*10)
  21. updatesDf.write.format("hudi"). \
  22. mode("append"). \
  23. save(basePath)
  24. # Query CDC data
  25. cdc_read_options = {
  26. 'hoodie.datasource.query.incremental.format': 'cdc',
  27. 'hoodie.datasource.query.type': 'incremental',
  28. 'hoodie.datasource.read.begin.instanttime': 0
  29. }
  30. spark.read.format("hudi"). \
  31. options(**cdc_read_options). \
  32. load(basePath).show(10, False)
  1. -- incrementally query data by path
  2. -- start from earliest available commit, end at latest available commit.
  3. SELECT * FROM hudi_table_changes('path/to/table', 'cdc', 'earliest');
  4. -- start from earliest, end at 202305160000.
  5. SELECT * FROM hudi_table_changes('path/to/table', 'cdc', 'earliest', '202305160000');
  6. -- start from 202305150000, end at 202305160000.
  7. SELECT * FROM hudi_table_changes('path/to/table', 'cdc', '202305150000', '202305160000');

Spark Quick Start - 图12Key requirements

Note that CDC queries are currently only supported on Copy-on-Write tables.

Table Types

The examples thus far have showcased one of the two table types, that Hudi supports - Copy-on-Write (COW) tables. Hudi also supports a more advanced write-optimized table type called Merge-on-Read (MOR) tables, that can balance read and write performance in a more flexible manner. See table types for more details.

Any of these examples can be run on a Merge-on-Read table by simply changing the table type to MOR, while creating the table, as below.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. inserts.write.format("hudi").
  3. ...
  4. option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  5. ...
  1. # pyspark
  2. hudi_options = {
  3. ...
  4. 'hoodie.datasource.write.table.type': 'MERGE_ON_READ'
  5. }
  6. inserts.write.format("hudi"). \
  7. options(**hudi_options). \
  8. mode("overwrite"). \
  9. save(basePath)
  1. CREATE TABLE hudi_table (
  2. uuid STRING,
  3. rider STRING,
  4. driver STRING,
  5. fare DOUBLE,
  6. city STRING
  7. ) USING HUDI TBLPROPERTIES (type = 'mor')
  8. PARTITIONED BY (city);

Keys

Hudi also allows users to specify a record key, which will be used to uniquely identify a record within a Hudi table. This is useful and critical to support features like indexing and clustering, which speed up upserts and queries respectively, in a consistent manner. Some of the other benefits of keys are explained in detail here. To this end, Hudi supports a wide range of built-in key generators, that make it easy to generate record keys for a given table. In the absence of a user configured key, Hudi will auto generate record keys, which are highly compressible.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. inserts.write.format("hudi").
  3. ...
  4. option("hoodie.datasource.write.recordkey.field", "uuid").
  5. ...
  1. # pyspark
  2. hudi_options = {
  3. ...
  4. 'hoodie.datasource.write.recordkey.field': 'uuid'
  5. }
  6. inserts.write.format("hudi"). \
  7. options(**hudi_options). \
  8. mode("overwrite"). \
  9. save(basePath)
  1. CREATE TABLE hudi_table (
  2. ts BIGINT,
  3. uuid STRING,
  4. rider STRING,
  5. driver STRING,
  6. fare DOUBLE,
  7. city STRING
  8. ) USING HUDI TBLPROPERTIES (primaryKey = 'uuid')
  9. PARTITIONED BY (city);

Spark Quick Start - 图13Implications of defining record keys

Configuring keys for a Hudi table, has a new implications on the table. If record key is set by the user, upsert is chosen as the write operation. Also if a record key is configured, then it’s also advisable to specify a precombine or ordering field, to correctly handle cases where the source data has multiple records with the same key. See section below.

Merge Modes

Hudi also allows users to specify a precombine field, which will be used to order and resolve conflicts between multiple versions of the same record. This is very important for use-cases like applying database CDC logs to a Hudi table, where a given record may appear multiple times in the source data due to repeated upstream updates. Hudi also uses this mechanism to support out-of-order data arrival into a table, where records may need to be resolved in a different order than their commit time. For e.g. using a created_at timestamp field as the precombine field will prevent older versions of a record from overwriting newer ones or being exposed to queries, even if they are written at a later commit time to the table. This is one of the key features, that makes Hudi, best suited for dealing with streaming data.

To enable different merge semantics, Hudi supports merge modes. Commit time and event time based merge modes are supported out of the box. Users can also define their own custom merge strategies, see here.

  • Scala
  • Python
  • Spark SQL
  1. // spark-shell
  2. updatesDf.write.format("hudi").
  3. ...
  4. option("hoodie.datasource.write.precombine.field", "ts").
  5. ...
  1. # pyspark
  2. hudi_options = {
  3. ...
  4. 'hoodie.datasource.write.precombine.field': 'ts'
  5. }
  6. upsert.write.format("hudi").
  7. options(**hudi_options).
  8. mode("append").
  9. save(basePath)
  1. CREATE TABLE hudi_table (
  2. ts BIGINT,
  3. uuid STRING,
  4. rider STRING,
  5. driver STRING,
  6. fare DOUBLE,
  7. city STRING
  8. ) USING HUDI TBLPROPERTIES (preCombineField = 'ts')
  9. PARTITIONED BY (city);

Where to go from here?

You can also build hudi yourself and try this quickstart using --jars <path to spark bundle jar>(see also build with scala 2.12) for more info. If you are looking for ways to migrate your existing data to Hudi, refer to migration guide.

Spark SQL Reference

For advanced usage of spark SQL, please refer to Spark SQL DDL and Spark SQL DML reference guides. For alter table commands, check out this. Stored procedures provide a lot of powerful capabilities using Hudi SparkSQL to assist with monitoring, managing and operating Hudi tables, please check this out.

Streaming workloads

Hudi provides industry-leading performance and functionality for streaming data.

Hudi Streamer - Hudi provides an incremental ingestion/ETL tool - HoodieStreamer, to assist with ingesting data into Hudi from various different sources in a streaming manner, with powerful built-in capabilities like auto checkpointing, schema enforcement via schema provider, transformation support, automatic table services and so on.

Structured Streaming - Hudi supports Spark Structured Streaming reads and writes as well. Please see here for more.

Check out more information on modeling data in Hudi and different ways to perform batch writes and streaming writes.

Dockerized Demo

Even as we showcased the core capabilities, Hudi supports a lot more advanced functionality that can make it easy to get your transactional data lakes up and running quickly, across a variety query engines like Hive, Flink, Spark, Presto, Trino and much more. We have put together a demo video that showcases all of this on a docker based setup with all dependent systems running locally. We recommend you replicate the same setup and run the demo yourself, by following steps here to get a taste for it.