Cleaning

Background

Cleaning is a table service employed by Hudi to reclaim space occupied by older versions of data and keep storage costs in check. Apache Hudi provides snapshot isolation between writers and readers by managing multiple versioned files with MVCC concurrency. These file versions provide history and enable time travel and rollbacks, but it is important to manage how much history you keep to balance your costs. Cleaning service plays a crucial role in manging the tradeoff between retaining long history of data and the associated storage costs.

Hudi enables Automatic Hudi cleaning by default. Cleaning is invoked immediately after each commit, to delete older file slices. It’s recommended to leave this enabled to ensure metadata and data storage growth is bounded. Cleaner can also be scheduled after every few commits instead of after every commit by configuring hoodie.clean.max.commits.

Cleaning Retention Policies

When cleaning old files, you should be careful not to remove files that are being actively used by long running queries.

For spark based:

Config NameDefaultDescription
hoodie.cleaner.policyKEEP_LATEST_COMMITS (Optional)org.apache.hudi.common.model.HoodieCleaningPolicy: Cleaning policy to be used.

Config Param: CLEANER_POLICY

The corresponding config for Flink based engine is clean.policy.

Hudi cleaner currently supports the below cleaning policies to keep a certain number of commits or file versions:

  • KEEP_LATEST_COMMITS: This is the default policy. This is a temporal cleaning policy that ensures the effect of having lookback into all the changes that happened in the last X commits. Suppose a writer is ingesting data into a Hudi dataset every 30 minutes and the longest running query can take 5 hours to finish, then the user should retain atleast the last 10 commits. With such a configuration, we ensure that the oldest version of a file is kept on disk for at least 5 hours, thereby preventing the longest running query from failing at any point in time. Incremental cleaning is also possible using this policy. Number of commits to retain can be configured by hoodie.cleaner.commits.retained. The corresponding Flink related config is clean.retain_commits.

  • KEEP_LATEST_FILE_VERSIONS: This policy has the effect of keeping N number of file versions irrespective of time. This policy is useful when it is known how many MAX versions of the file does one want to keep at any given time. To achieve the same behaviour as before of preventing long running queries from failing, one should do their calculations based on data patterns. Alternatively, this policy is also useful if a user just wants to maintain 1 latest version of the file. Number of file versions to retain can be configured by hoodie.cleaner.fileversions.retained. The corresponding Flink related config is clean.retain_file_versions.

  • KEEP_LATEST_BY_HOURS: This policy clean up based on hours.It is simple and useful when knowing that you want to keep files at any given time. Corresponding to commits with commit times older than the configured number of hours to be retained are cleaned. Currently you can configure by parameter hoodie.cleaner.hours.retained. The corresponding Flink related config is clean.retain_hours.

Configs

For details about all possible configurations and their default values see the configuration docs. For Flink related configs refer here.

Ways to trigger Cleaning

Inline

By default, in Spark based writing, cleaning is run inline after every commit using the default policy of KEEP_LATEST_COMMITS. It’s recommended to keep this enabled, to ensure metadata and data storage growth is bounded. To enable this, users do not have to set any configs. Following are the relevant basic configs.

Config NameDefaultDescription
hoodie.clean.automatictrue (Optional)When enabled, the cleaner table service is invoked immediately after each commit, to delete older file slices. It’s recommended to enable this, to ensure metadata and data storage growth is bounded.

Config Param: AUTO_CLEAN
hoodie.cleaner.commits.retained10 (Optional)Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits (scheduled). This also directly translates into how much data retention the table supports for incremental queries.

Config Param: CLEANER_COMMITS_RETAINED

Async

In case you wish to run the cleaner service asynchronously along with writing, please enable the hoodie.clean.async as shown below:

  1. hoodie.clean.automatic=true
  2. hoodie.clean.async=true

For Flink based writing, this is the default mode of cleaning. Please refer to clean.async.enabled for details.

Run independently

Hoodie Cleaner can also be run as a separate process. Following is the command for running the cleaner independently:

  1. spark-submit --master local \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` --help
  4. Usage: <main class> [options]
  5. Options:
  6. --help, -h
  7. --hoodie-conf
  8. Any configuration that can be set in the properties file (using the CLI
  9. parameter "--props") can also be passed command line using this
  10. parameter. This can be repeated
  11. Default: []
  12. --props
  13. path to properties file on localfs or dfs, with configurations for
  14. hoodie client for cleaning
  15. --spark-master
  16. spark master to use.
  17. Default: local[2]
  18. * --target-base-path
  19. base path for the hoodie table to be cleaner.

Some examples to run the cleaner.
Keep the latest 10 commits

  1. spark-submit --master local \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --target-base-path /path/to/hoodie_table \
  5. --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
  6. --hoodie-conf hoodie.cleaner.commits.retained=10 \
  7. --hoodie-conf hoodie.cleaner.parallelism=200

Keep the latest 3 file versions

  1. spark-submit --master local \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \
  5. --hoodie-conf hoodie.cleaner.fileversions.retained=3 \
  6. --hoodie-conf hoodie.cleaner.parallelism=200

Clean commits older than 24 hours

  1. spark-submit --master local \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --target-base-path /path/to/hoodie_table \
  5. --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
  6. --hoodie-conf hoodie.cleaner.hours.retained=24 \
  7. --hoodie-conf hoodie.cleaner.parallelism=200

Note: The parallelism takes the min value of number of partitions to clean and hoodie.cleaner.parallelism.

CLI

You can also use Hudi CLI to run Hoodie Cleaner.

CLI provides the below commands for cleaner service:

  • cleans show
  • clean showpartitions
  • cleans run

Example of cleaner keeping the latest 10 commits

  1. cleans run --sparkMaster local --hoodieConfigs hoodie.cleaner.policy=KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained=3 hoodie.cleaner.parallelism=200

You can find more details and the relevant code for these commands in org.apache.hudi.cli.commands.CleansCommand class.

Videos