Automatic compaction

In Apache Druid, compaction is a special type of ingestion task that reads data from a Druid datasource and writes it back into the same datasource. A common use case for this is to optimally size segments after ingestion to improve query performance. Automatic compaction, or auto-compaction, refers to the system for automatic execution of compaction tasks managed by the Druid Coordinator. This topic guides you through setting up automatic compaction for your Druid cluster. See the examples for common use cases for automatic compaction.

How Druid manages automatic compaction

The Coordinator indexing period, druid.coordinator.period.indexingPeriod, controls the frequency of compaction tasks. The default indexing period is 30 minutes, meaning that the Coordinator first checks for segments to compact at most 30 minutes from when auto-compaction is enabled. This time period affects other Coordinator duties including merge and conversion tasks. To configure the auto-compaction time period without interfering with indexingPeriod, see Set frequency of compaction runs.

At every invocation of auto-compaction, the Coordinator initiates a segment search to determine eligible segments to compact. When there are eligible segments to compact, the Coordinator issues compaction tasks based on available worker capacity. If a compaction task takes longer than the indexing period, the Coordinator waits for it to finish before resuming the period for segment search.

Automatic compaction - 图1info

Auto-compaction skips datasources that have a segment granularity of ALL.

As a best practice, you should set up auto-compaction for all Druid datasources. You can run compaction tasks manually for cases where you want to allocate more system resources. For example, you may choose to run multiple compaction tasks in parallel to compact an existing datasource for the first time. See Compaction for additional details and use cases.

Enable automatic compaction

You can enable automatic compaction for a datasource using the web console or programmatically via an API. This process differs for manual compaction tasks, which can be submitted from the Tasks view of the web console or the Tasks API.

Web console

Use the web console to enable automatic compaction for a datasource as follows.

  1. Click Datasources in the top-level navigation.
  2. In the Compaction column, click the edit icon for the datasource to compact.
  3. In the Compaction config dialog, configure the auto-compaction settings. The dialog offers a form view as well as a JSON view. Editing the form updates the JSON specification, and editing the JSON updates the form field, if present. Form fields not present in the JSON indicate default values. You may add additional properties to the JSON for auto-compaction settings not displayed in the form. See Configure automatic compaction for supported settings for auto-compaction.
  4. Click Submit.
  5. Refresh the Datasources view. The Compaction column for the datasource changes from “Not enabled” to “Awaiting first run.”

The following screenshot shows the compaction config dialog for a datasource with auto-compaction enabled. Compaction config in web console

To disable auto-compaction for a datasource, click Delete from the Compaction config dialog. Druid does not retain your auto-compaction configuration.

Compaction configuration API

Use the Automatic compaction API to configure automatic compaction. To enable auto-compaction for a datasource, create a JSON object with the desired auto-compaction settings. See Configure automatic compaction for the syntax of an auto-compaction spec. Send the JSON object as a payload in a POST request to /druid/coordinator/v1/config/compaction. The following example configures auto-compaction for the wikipedia datasource:

  1. curl --location --request POST 'http://localhost:8081/druid/coordinator/v1/config/compaction' \
  2. --header 'Content-Type: application/json' \
  3. --data-raw '{
  4. "dataSource": "wikipedia",
  5. "granularitySpec": {
  6. "segmentGranularity": "DAY"
  7. }
  8. }'

To disable auto-compaction for a datasource, send a DELETE request to /druid/coordinator/v1/config/compaction/{dataSource}. Replace {dataSource} with the name of the datasource for which to disable auto-compaction. For example:

  1. curl --location --request DELETE 'http://localhost:8081/druid/coordinator/v1/config/compaction/wikipedia'

Configure automatic compaction

You can configure automatic compaction dynamically without restarting Druid. The automatic compaction system uses the following syntax:

  1. {
  2. "dataSource": <task_datasource>,
  3. "ioConfig": <IO config>,
  4. "dimensionsSpec": <custom dimensionsSpec>,
  5. "transformSpec": <custom transformSpec>,
  6. "metricsSpec": <custom metricsSpec>,
  7. "tuningConfig": <parallel indexing task tuningConfig>,
  8. "granularitySpec": <compaction task granularitySpec>,
  9. "skipOffsetFromLatest": <time period to avoid compaction>,
  10. "taskPriority": <compaction task priority>,
  11. "taskContext": <task context>
  12. }

Most fields in the auto-compaction configuration correlate to a typical Druid ingestion spec. The following properties only apply to auto-compaction:

  • skipOffsetFromLatest
  • taskPriority
  • taskContext

Since the automatic compaction system provides a management layer on top of manual compaction tasks, the auto-compaction configuration does not include task-specific properties found in a typical Druid ingestion spec. The following properties are automatically set by the Coordinator:

  • type: Set to compact.
  • id: Generated using the task type, datasource name, interval, and timestamp. The task ID is prefixed with coordinator-issued.
  • context: Set according to the user-provided taskContext.

Compaction tasks typically fetch all relevant segments prior to launching any subtasks, unless the following properties are all set to non-null values. It is strongly recommended to set them to non-null values to maximize performance and minimize disk usage of the compact tasks launched by auto-compaction:

For more details on each of the specs in an auto-compaction configuration, see Automatic compaction dynamic configuration.

Avoid conflicts with ingestion

Compaction tasks may be interrupted when they interfere with ingestion. For example, this occurs when an ingestion task needs to write data to a segment for a time interval locked for compaction. If there are continuous failures that prevent compaction from making progress, consider one of the following strategies:

  • Set skipOffsetFromLatest to reduce the chance of conflicts between ingestion and compaction. See more details in this section below.
  • Increase the priority value of compaction tasks relative to ingestion tasks. Only recommended for advanced users. This approach can cause ingestion jobs to fail or lag. To change the priority of compaction tasks, set taskPriority to the desired priority value in the auto-compaction configuration. For details on the priority values of different task types, see Lock priority.

The Coordinator compacts segments from newest to oldest. In the auto-compaction configuration, you can set a time period, relative to the end time of the most recent segment, for segments that should not be compacted. Assign this value to skipOffsetFromLatest. Note that this offset is not relative to the current time but to the latest segment time. For example, if you want to skip over segments from five days prior to the end time of the most recent segment, assign "skipOffsetFromLatest": "P5D".

To set skipOffsetFromLatest, consider how frequently you expect the stream to receive late arriving data. If your stream only occasionally receives late arriving data, the auto-compaction system robustly compacts your data even though data is ingested outside the skipOffsetFromLatest window. For most realtime streaming ingestion use cases, it is reasonable to set skipOffsetFromLatest to a few hours or a day.

Set frequency of compaction runs

If you want the Coordinator to check for compaction more frequently than its indexing period, create a separate group to handle compaction duties. Set the time period of the duty group in the coordinator/runtime.properties file. The following example shows how to create a duty group named compaction and set the auto-compaction period to 1 minute:

  1. druid.coordinator.dutyGroups=["compaction"]
  2. druid.coordinator.compaction.duties=["compactSegments"]
  3. druid.coordinator.compaction.period=PT60S

View automatic compaction statistics

After the Coordinator has initiated auto-compaction, you can view compaction statistics for the datasource, including the number of bytes, segments, and intervals already compacted and those awaiting compaction. The Coordinator also reports the total bytes, segments, and intervals not eligible for compaction in accordance with its segment search policy.

In the web console, the Datasources view displays auto-compaction statistics. The Tasks view shows the task information for compaction tasks that were triggered by the automatic compaction system.

To get statistics by API, send a GET request to /druid/coordinator/v1/compaction/status. To filter the results to a particular datasource, pass the datasource name as a query parameter to the request—for example, /druid/coordinator/v1/compaction/status?dataSource=wikipedia.

Examples

The following examples demonstrate potential use cases in which auto-compaction may improve your Druid performance. See more details in Compaction strategies. The examples in this section do not change the underlying data.

Change segment granularity

You have a stream set up to ingest data with HOUR segment granularity into the wikistream datasource. You notice that your Druid segments are smaller than the recommended segment size of 5 million rows per segment. You wish to automatically compact segments to DAY granularity while leaving the latest week of data not compacted because your stream consistently receives data within that time period.

The following auto-compaction configuration compacts existing HOUR segments into DAY segments while leaving the latest week of data not compacted:

  1. {
  2. "dataSource": "wikistream",
  3. "granularitySpec": {
  4. "segmentGranularity": "DAY"
  5. },
  6. "skipOffsetFromLatest": "P1W",
  7. }

Update partitioning scheme

For your wikipedia datasource, you want to optimize segment access when regularly ingesting data without compromising compute time when querying the data. Your ingestion spec for batch append uses dynamic partitioning to optimize for write-time operations, while your stream ingestion partitioning is configured by the stream service. You want to implement auto-compaction to reorganize the data with a suitable read-time partitioning using multi-dimension range partitioning. Based on the dimensions frequently accessed in queries, you wish to partition on the following dimensions: channel, countryName, namespace.

The following auto-compaction configuration compacts updates the wikipedia segments to use multi-dimension range partitioning:

  1. {
  2. "dataSource": "wikipedia",
  3. "tuningConfig": {
  4. "partitionsSpec": {
  5. "type": "range",
  6. "partitionDimensions": [
  7. "channel",
  8. "countryName",
  9. "namespace"
  10. ],
  11. "targetRowsPerSegment": 5000000
  12. }
  13. }
  14. }

Concurrent append and replace

Automatic compaction - 图3info

Concurrent append and replace is an experimental feature and is not currently available for SQL-based ingestion.

This feature allows you to safely replace the existing data in an interval of a datasource while new data is being appended to that interval. One of the most common applications of this is appending new data (using say streaming ingestion) to an interval while compaction of that interval is already in progress.

To set up concurrent append and replace, you need to ensure that your ingestion jobs have the appropriate lock types:

You can enable concurrent append and replace by ensuring the following:

  • The append task (with appendToExisting set to true) has taskLockType set to APPEND in the task context.
  • The replace task (with appendToExisting set to false) has taskLockType set to REPLACE in the task context.
  • The segment granularity of the append task is equal to or finer than the segment granularity of the replace task.

Automatic compaction - 图4info

When using concurrent append and replace, keep the following in mind:

  • Concurrent append and replace fails if the task with APPEND lock uses a coarser segment granularity than the task with the REPLACE lock. For example, if the APPEND task uses a segment granularity of YEAR and the REPLACE task uses a segment granularity of MONTH, you should not use concurrent append and replace.

  • Only a single task can hold a REPLACE lock on a given interval of a datasource.

  • Multiple tasks can hold APPEND locks on a given interval of a datasource and append data to that interval simultaneously.

Configure concurrent append and replace

Update the compaction settings with the API

Prepare your datasource for concurrent append and replace by setting its task lock type to REPLACE. Add the taskContext like you would any other automatic compaction setting through the API:

  1. curl --location --request POST 'http://localhost:8081/druid/coordinator/v1/config/compaction' \
  2. --header 'Content-Type: application/json' \
  3. --data-raw '{
  4. "dataSource": "YOUR_DATASOURCE",
  5. "taskContext": {
  6. "taskLockType": "REPLACE"
  7. }
  8. }'
Update the compaction settings with the UI

In the Compaction config for a datasource, set Allow concurrent compactions (experimental) to True.

Add a task lock type to your ingestion job

Next, you need to configure the task lock type for your ingestion job:

  • For streaming jobs, the context parameter goes in your supervisor spec, and the lock type is always APPEND
  • For legacy JSON-based batch ingestion, the context parameter goes in your ingestion spec, and the lock type can be either APPEND or REPLACE.

You can provide the context parameter through the API like any other parameter for ingestion job or through the UI.

Add the task lock type through the API

Add the following JSON snippet to your supervisor or ingestion spec if you’re using the API:

  1. "context": {
  2. "taskLockType": LOCK_TYPE
  3. }

The LOCK_TYPE depends on what you’re trying to accomplish.

Set taskLockType to APPEND if either of the following are true:

  • Dynamic partitioning with append to existing is set to true
  • The ingestion job is a streaming ingestion job

If you have multiple ingestion jobs that append all targeting the same datasource and want them to run simultaneously, you need to also include the following context parameter:

  1. "useSharedLock": "true"

Keep in mind that taskLockType takes precedence over useSharedLock. Do not use it with REPLACE task locks.

Set taskLockType to REPLACE if you’re replacing data. For example, if you use any of the following partitioning types, use REPLACE:

  • hash partitioning
  • range partitioning
  • dynamic partitioning with append to existing set to false
Add a task lock using the Druid console

As part of the Load data wizard for classic batch (JSON-based ingestion) and streaming ingestion, you can configure the task lock type for the ingestion during the Publish step:

  • If you set Append to existing to True, you can then set Allow concurrent append tasks (experimental) to True.
  • If you set Append to existing to False, you can then set Allow concurrent replace tasks (experimental) to True.

Learn more

See the following topics for more information:

  • Compaction for an overview of compaction and how to set up manual compaction in Druid.
  • Segment optimization for guidance on evaluating and optimizing Druid segment size.
  • Coordinator process for details on how the Coordinator plans compaction tasks.