Indexing

Hudi maintains a scalable metadata that has some auxiliary data about the table. The pluggable indexing subsystem of Hudi depends on the metadata table. Different types of index, from files index for locating records efficiently to column_stats index for data skipping, are part of the metadata table. A fundamental tradeoff in any data system that supports indices is to balance the write throughput with index updates. A brute-force way is to lock out the writes while indexing. Hudi supports index creation using SQL, Datasource as well as async indexing. However, very large tables can take hours to index. This is where Hudi’s novel concurrent indexing comes into play.

Concurrent Indexing

Indexes in Hudi are created in two phases and uses a mix of optimistic concurrency control and multi-version concurrency control techniques. The two phase approach ensures that the other writers are unblocked.

  • Scheduling & Planning : This is the first phase which schedules an indexing plan and is protected by a lock. Indexing plan considers all the completed commits upto indexing instant.
  • Execution : This phase creates the index files as mentioned in the index plan. At the end of the phase Hudi ensures the completed commits after indexing instant used already created index plan to add corresponding index metadata. This check is protected by a metadata table lock and in case of failures indexing is aborted.

We can now create different indexes and metadata, including bloom_filters, column_stats, partition_stats, record_index, secondary_index and expression_index asynchronously in Hudi. Being able to index without blocking writing ensures write performance is unaffected and no additional manual maintenance is necessary to add/remove indexes. It also reduces resource wastage by avoiding contention between writing and indexing.

Please refer section Setup Async Indexing to get more details on how to setup asynchronous indexing. To learn more about the design of asynchronous indexing feature, please check out this blog.

Index Creation Using SQL

Currently indexes like secondary index, expression index and record index can be created using SQL create index command. For more information on these indexes please refer metadata section

Indexing - 图1note

Please note in order to create secondary index:

  1. The table must have a primary key and merge mode should be COMMIT_TIME_ORDERING.
  2. Record index must be enabled. This can be done by setting hoodie.metadata.record.index.enable=true and then creating record_index. Please note the example below.

Examples

  1. -- Create record index on primary key - uuid
  2. CREATE INDEX record_index ON hudi_indexed_table (uuid);
  3. -- Create secondary index on rider column.
  4. CREATE INDEX idx_rider ON hudi_indexed_table (rider);
  5. -- Create expression index by performing transformation on ts and driver column
  6. -- The index is created on the transformed column. Here column stats index is created on ts column
  7. -- and bloom filters index is created on driver column.
  8. CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
  9. CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity');

For more information on index creation using SQL refer SQL DDL

Index Creation Using Datasource

Indexes like bloom_filters, column_stats, partition_stats and record_index can be created using Datasource. Below we list the various configs which are needed to create the indexes mentioned.

  1. -- [Required Configs] Partition stats
  2. hoodie.metadata.index.partition.stats.enable=true
  3. hoodie.metadata.index.column.stats.enable=true
  4. -- [Optional Configs] - list of columns to index on. By default all columns are indexed
  5. hoodie.metadata.index.column.stats.column.list=col1,col2,...
  6. -- [Required Configs] Column stats
  7. hoodie.metadata.index.column.stats.enable=true
  8. -- [Optional Configs] - list of columns to index on. By default all columns are indexed
  9. hoodie.metadata.index.column.stats.column.list=col1,col2,...
  10. -- [Required Configs] Record Level Index
  11. hoodie.metadata.record.index.enable=true
  12. -- [Required Configs] Bloom filter Index
  13. hoodie.metadata.index.bloom.filter.enable=true

Here is an example which shows how to create indexes for a table created using Datasource API.

Examples

  1. import scala.collection.JavaConversions._
  2. import org.apache.spark.sql.SaveMode._
  3. import org.apache.hudi.DataSourceReadOptions._
  4. import org.apache.hudi.DataSourceWriteOptions._
  5. import org.apache.hudi.common.table.HoodieTableConfig._
  6. import org.apache.hudi.config.HoodieWriteConfig._
  7. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  8. import org.apache.hudi.common.model.HoodieRecord
  9. import spark.implicits._
  10. val tableName = "trips_table_index"
  11. val basePath = "file:///tmp/trips_table_index"
  12. val columns = Seq("ts","uuid","rider","driver","fare","city")
  13. val data =
  14. Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  15. (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  16. (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  17. (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
  18. (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  19. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  20. inserts.write.format("hudi").
  21. option("hoodie.datasource.write.partitionpath.field", "city").
  22. option("hoodie.table.name", tableName).
  23. option("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING").
  24. option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  25. mode(Overwrite).
  26. save(basePath)
  27. // Create record index and secondary index for the table
  28. spark.sql(s"CREATE TABLE test_table_external USING hudi LOCATION '$basePath'")
  29. spark.sql(s"SET hoodie.metadata.record.index.enable=true")
  30. spark.sql(s"CREATE INDEX record_index ON test_table_external (uuid)")
  31. spark.sql(s"CREATE INDEX idx_rider ON test_table_external (rider)")
  32. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table").show(false)
  33. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E'").show(false)

Setup Async Indexing

In the example we will have continuous writing using Hudi Streamer and also create index in parallel. The index creation in example is done using HoodieIndexer so that schedule and execute phases are clearly visible for indexing. The asynchronous configurations can be used with Datasource and SQL based configs to create index as well.

First, we will generate a continuous workload. In the below example, we are going to start a Hudi Streamer which will continuously write data from raw parquet to Hudi table. We used the widely available NY Taxi dataset, whose setup details are as below:

Ingestion write config

  1. hoodie.datasource.write.recordkey.field=VendorID
  2. hoodie.datasource.write.partitionpath.field=tpep_dropoff_datetime
  3. hoodie.datasource.write.precombine.field=tpep_dropoff_datetime
  4. hoodie.streamer.source.dfs.root=/Users/home/path/to/data/parquet_files/
  5. hoodie.streamer.schemaprovider.target.schema.file=/Users/home/path/to/schema/schema.avsc
  6. hoodie.streamer.schemaprovider.source.schema.file=/Users/home/path/to/schema/schema.avsc
  7. // set lock provider configs
  8. hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
  9. hoodie.write.lock.zookeeper.url=<zk_url>
  10. hoodie.write.lock.zookeeper.port=<zk_port>
  11. hoodie.write.lock.zookeeper.lock_key=<zk_key>
  12. hoodie.write.lock.zookeeper.base_path=<zk_base_path>

Run Hudi Streamer

  1. spark-submit \
  2. --jars "packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar,packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0.jar" \
  3. --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls /Users/home/path/to/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar` \
  4. --props `ls /Users/home/path/to/write/config.properties` \
  5. --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  6. --source-ordering-field tpep_dropoff_datetime \
  7. --table-type COPY_ON_WRITE \
  8. --target-base-path file:///tmp/hudi-ny-taxi/ \
  9. --target-table ny_hudi_tbl \
  10. --op UPSERT \
  11. --continuous \
  12. --source-limit 5000000 \
  13. --min-sync-interval-seconds 60

Hudi metadata table is enabled by default and the files index will be automatically created. While the Hudi Streamer is running in continuous mode, let us schedule the indexing for COLUMN_STATS index. First we need to define a properties file for the indexer.

Configurations

As mentioned before, metadata indices are pluggable. One can add any index at any point in time depending on changing business requirements. Some configurations to enable particular indices are listed below. Currently, available indices under metadata table can be explored here along with configs to enable them. The full set of metadata configurations can be explored here.

Indexing - 图2note

Enabling the metadata table and configuring a lock provider are the prerequisites for using async indexer. Checkout a sample configuration below.

  1. # ensure that async indexing is enabled
  2. hoodie.metadata.index.async=true
  3. # enable column_stats index config
  4. hoodie.metadata.index.column.stats.enable=true
  5. # set concurrency mode and lock configs as this is a multi-writer scenario
  6. # check https://hudi.apache.org/docs/concurrency_control/ for differnt lock provider configs
  7. hoodie.write.concurrency.mode=optimistic_concurrency_control
  8. hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
  9. hoodie.write.lock.zookeeper.url=<zk_url>
  10. hoodie.write.lock.zookeeper.port=<zk_port>
  11. hoodie.write.lock.zookeeper.lock_key=<zk_key>
  12. hoodie.write.lock.zookeeper.base_path=<zk_base_path>

Schedule indexing

Now, we can schedule indexing using HoodieIndexer in schedule mode as follows:

  1. spark-submit \
  2. --jars "packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar,packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0.jar" \
  3. --class org.apache.hudi.utilities.HoodieIndexer \
  4. /Users/home/path/to/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar \
  5. --props /Users/home/path/to/indexer.properties \
  6. --mode schedule \
  7. --base-path /tmp/hudi-ny-taxi \
  8. --table-name ny_hudi_tbl \
  9. --index-types COLUMN_STATS \
  10. --parallelism 1 \
  11. --spark-memory 1g

This will write an indexing.requested instant to the timeline.

Execute Indexing

To execute indexing, run the indexer in execute mode as below.

  1. spark-submit \
  2. --jars "packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar,packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0.jar" \
  3. --class org.apache.hudi.utilities.HoodieIndexer \
  4. /Users/home/path/to/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar \
  5. --props /Users/home/path/to/indexer.properties \
  6. --mode execute \
  7. --base-path /tmp/hudi-ny-taxi \
  8. --table-name ny_hudi_tbl \
  9. --index-types COLUMN_STATS \
  10. --parallelism 1 \
  11. --spark-memory 1g

We can also run the indexer in scheduleAndExecute mode to do the above two steps in one shot. Doing it separately gives us better control over when we want to execute.

Let’s look at the data timeline.

  1. ls -lrt /tmp/hudi-ny-taxi/.hoodie
  2. total 1816
  3. -rw-r--r-- 1 sagars wheel 0 Apr 14 19:53 20220414195327683.commit.requested
  4. -rw-r--r-- 1 sagars wheel 153423 Apr 14 19:54 20220414195327683.inflight
  5. -rw-r--r-- 1 sagars wheel 207061 Apr 14 19:54 20220414195327683.commit
  6. -rw-r--r-- 1 sagars wheel 0 Apr 14 19:54 20220414195423420.commit.requested
  7. -rw-r--r-- 1 sagars wheel 659 Apr 14 19:54 20220414195437837.indexing.requested
  8. -rw-r--r-- 1 sagars wheel 323950 Apr 14 19:54 20220414195423420.inflight
  9. -rw-r--r-- 1 sagars wheel 0 Apr 14 19:55 20220414195437837.indexing.inflight
  10. -rw-r--r-- 1 sagars wheel 222920 Apr 14 19:55 20220414195423420.commit
  11. -rw-r--r-- 1 sagars wheel 734 Apr 14 19:55 hoodie.properties
  12. -rw-r--r-- 1 sagars wheel 979 Apr 14 19:55 20220414195437837.indexing

In the data timeline, we can see that indexing was scheduled after one commit completed (20220414195327683.commit) and another was requested (20220414195423420.commit.requested). This would have picked 20220414195327683 as the base instant. Indexing was inflight with an inflight writer as well. If we parse the indexer logs, we would find that it indeed caught up with instant 20220414195423420 after indexing upto the base instant.

  1. 22/04/14 19:55:22 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from /tmp/hudi-ny-taxi/.hoodie/metadata
  2. 22/04/14 19:55:22 INFO RunIndexActionExecutor: Starting Index Building with base instant: 20220414195327683
  3. 22/04/14 19:55:22 INFO HoodieBackedTableMetadataWriter: Creating a new metadata index for partition 'column_stats' under path /tmp/hudi-ny-taxi/.hoodie/metadata upto instant 20220414195327683
  4. ...
  5. ...
  6. 22/04/14 19:55:38 INFO RunIndexActionExecutor: Total remaining instants to index: 1
  7. 22/04/14 19:55:38 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /tmp/hudi-ny-taxi/.hoodie/metadata
  8. 22/04/14 19:55:38 INFO HoodieTableConfig: Loading table properties from /tmp/hudi-ny-taxi/.hoodie/metadata/.hoodie/hoodie.properties
  9. 22/04/14 19:55:38 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from /tmp/hudi-ny-taxi/.hoodie/metadata
  10. 22/04/14 19:55:38 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220414195423420__deltacommit__COMPLETED]}
  11. 22/04/14 19:55:38 INFO RunIndexActionExecutor: Starting index catchup task
  12. ...

Drop Index

To drop an index, just run the index in dropindex mode.

  1. spark-submit \
  2. --jars "packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar,packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0.jar" \
  3. --class org.apache.hudi.utilities.HoodieIndexer \
  4. /Users/home/path/to/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.0.jar \
  5. --props /Users/home/path/to/indexer.properties \
  6. --mode dropindex \
  7. --base-path /tmp/hudi-ny-taxi \
  8. --table-name ny_hudi_tbl \
  9. --index-types COLUMN_STATS \
  10. --parallelism 1 \
  11. --spark-memory 2g

Caveats

Asynchronous indexing feature is still evolving. Few points to note from deployment perspective while running the indexer:

  • Files index is created by default as long as the metadata table is enabled.
  • Trigger indexing for one metadata partition (or index type) at a time.
  • If an index is enabled via async indexing, then ensure that index is also enabled in configs corresponding to regular ingestion writers. Otherwise, metadata writer will think that particular index was disabled and cleanup the metadata partition.

Some of these limitations will be removed in the upcoming releases. Please follow HUDI-2488 for developments on this feature.

Videos