How transform checkpoints work

How transform checkpoints work

Each time a transform examines the source indices and creates or updates the destination index, it generates a checkpoint.

If your transform runs only once, there is logically only one checkpoint. If your transform runs continuously, however, it creates checkpoints as it ingests and transforms new source data. The sync property of the transform configures checkpointing by specifying a time field.

To create a checkpoint, the continuous transform:

  1. Checks for changes to source indices.

    Using a simple periodic timer, the transform checks for changes to the source indices. This check is done based on the interval defined in the transform’s frequency property.

    If the source indices remain unchanged or if a checkpoint is already in progress then it waits for the next timer.

    If changes are found a checkpoint is created.

  2. Identifies which entities and/or time buckets have changed.

    The transform searches to see which entities or time buckets have changed between the last and the new checkpoint. The transform uses the values to synchronize the source and destination indices with fewer operations than a full re-run.

  3. Updates the destination index (the data frame) with the changes.

    The transform applies changes related to either new or changed entities or time buckets to the destination index. The set of changes can be paginated. The transform performs a composite aggregation similarly to the batch transform operation, however it also injects query filters based on the previous step to reduce the amount of work. After all changes have been applied, the checkpoint is complete.

This checkpoint process involves both search and indexing activity on the cluster. We have attempted to favor control over performance while developing transforms. We decided it was preferable for the transform to take longer to complete, rather than to finish quickly and take precedence in resource consumption. That being said, the cluster still requires enough resources to support both the composite aggregation search and the indexing of its results.

If the cluster experiences unsuitable performance degradation due to the transform, stop the transform and refer to Performance considerations.

Using the ingest timestamp for syncing the transform

In most cases, it is strongly recommended to use the ingest timestamp of the source indices for syncing the transform. This is the most optimal way for transforms to be able to identify new changes. If your data source follows the ECS standard, you might already have an event.ingested field. In this case, use event.ingested as the sync.time.field property of your transform.

If you don’t have a event.ingested field or it isn’t populated, you can set it by using an ingest pipeline. Create an ingest pipeline either using the ingest pipeline API (like the example below) or via Kibana under Stack Management > Ingest Pipelines. Use a set processor to set the field and associate it with the value of the ingest timestamp.

  1. resp = client.ingest.put_pipeline(
  2. id="set_ingest_time",
  3. description="Set ingest timestamp.",
  4. processors=[
  5. {
  6. "set": {
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)
  1. response = client.ingest.put_pipeline(
  2. id: 'set_ingest_time',
  3. body: {
  4. description: 'Set ingest timestamp.',
  5. processors: [
  6. {
  7. set: {
  8. field: 'event.ingested',
  9. value: '{{{_ingest.timestamp}}}'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response
  1. const response = await client.ingest.putPipeline({
  2. id: "set_ingest_time",
  3. description: "Set ingest timestamp.",
  4. processors: [
  5. {
  6. set: {
  7. field: "event.ingested",
  8. value: "{{{_ingest.timestamp}}}",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);
  1. PUT _ingest/pipeline/set_ingest_time
  2. {
  3. "description": "Set ingest timestamp.",
  4. "processors": [
  5. {
  6. "set": {
  7. "field": "event.ingested",
  8. "value": "{{{_ingest.timestamp}}}"
  9. }
  10. }
  11. ]
  12. }

After you created the ingest pipeline, apply it to the source indices of your transform. The pipeline adds the field event.ingested to every document with the value of the ingest timestamp. Configure the sync.time.field property of your transform to use the field by using the create transform API for new transforms or the update transform API for existing transforms. The event.ingested field is used for syncing the transform.

Refer to Add a pipeline to an indexing request and Ingest pipelines to learn more about how to use an ingest pipeline.

Change detection heuristics

When the transform runs in continuous mode, it updates the documents in the destination index as new data comes in. The transform uses a set of heuristics called change detection to update the destination index with fewer operations.

In this example, the data is grouped by host names. Change detection detects which host names have changed, for example, host A, C and G and only updates documents with those hosts but does not update documents that store information about host B, D, or any other host that are not changed.

Another heuristic can be applied for time buckets when a date_histogram is used to group by time buckets. Change detection detects which time buckets have changed and only update those.

Error handling

Failures in transforms tend to be related to searching or indexing. To increase the resiliency of transforms, the cursor positions of the aggregated search and the changed entities search are tracked in memory and persisted periodically.

Checkpoint failures can be categorized as follows:

  • Temporary failures: The checkpoint is retried. If 10 consecutive failures occur, the transform has a failed status. For example, this situation might occur when there are shard failures and queries return only partial results.
  • Irrecoverable failures: The transform immediately fails. For example, this situation occurs when the source index is not found.
  • Adjustment failures: The transform retries with adjusted settings. For example, if a parent circuit breaker memory errors occur during the composite aggregation, the transform receives partial results. The aggregated search is retried with a smaller number of buckets. This retry is performed at the interval defined in the frequency property for the transform. If the search is retried to the point where it reaches a minimal number of buckets, an irrecoverable failure occurs.

If the node running the transforms fails, the transform restarts from the most recent persisted cursor position. This recovery process might repeat some of the work the transform had already done, but it ensures data consistency.