Pipeline processor

Pipeline processor

Executes another pipeline.

Table 34. Pipeline Options

NameRequiredDefaultDescription

name

yes

-

The name of the pipeline to execute. Supports template snippets.

ignore_missing_pipeline

no

false

Whether to ignore missing pipelines instead of failing.

description

no

-

Description of the processor. Useful for describing the purpose of the processor or its configuration.

if

no

-

Conditionally execute the processor. See Conditionally run a processor.

ignore_failure

no

false

Ignore failures for the processor. See Handling pipeline failures.

on_failure

no

-

Handle failures for the processor. See Handling pipeline failures.

tag

no

-

Identifier for the processor. Useful for debugging and metrics.

  1. {
  2. "pipeline": {
  3. "name": "inner-pipeline"
  4. }
  5. }

The name of the current pipeline can be accessed from the _ingest.pipeline ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:

  1. resp = client.ingest.put_pipeline(
  2. id="pipelineA",
  3. description="inner pipeline",
  4. processors=[
  5. {
  6. "set": {
  7. "field": "inner_pipeline_set",
  8. "value": "inner"
  9. }
  10. }
  11. ],
  12. )
  13. print(resp)
  1. response = client.ingest.put_pipeline(
  2. id: 'pipelineA',
  3. body: {
  4. description: 'inner pipeline',
  5. processors: [
  6. {
  7. set: {
  8. field: 'inner_pipeline_set',
  9. value: 'inner'
  10. }
  11. }
  12. ]
  13. }
  14. )
  15. puts response
  1. const response = await client.ingest.putPipeline({
  2. id: "pipelineA",
  3. description: "inner pipeline",
  4. processors: [
  5. {
  6. set: {
  7. field: "inner_pipeline_set",
  8. value: "inner",
  9. },
  10. },
  11. ],
  12. });
  13. console.log(response);
  1. PUT _ingest/pipeline/pipelineA
  2. {
  3. "description" : "inner pipeline",
  4. "processors" : [
  5. {
  6. "set" : {
  7. "field": "inner_pipeline_set",
  8. "value": "inner"
  9. }
  10. }
  11. ]
  12. }

Define another pipeline that uses the previously defined inner pipeline:

  1. resp = client.ingest.put_pipeline(
  2. id="pipelineB",
  3. description="outer pipeline",
  4. processors=[
  5. {
  6. "pipeline": {
  7. "name": "pipelineA"
  8. }
  9. },
  10. {
  11. "set": {
  12. "field": "outer_pipeline_set",
  13. "value": "outer"
  14. }
  15. }
  16. ],
  17. )
  18. print(resp)
  1. response = client.ingest.put_pipeline(
  2. id: 'pipelineB',
  3. body: {
  4. description: 'outer pipeline',
  5. processors: [
  6. {
  7. pipeline: {
  8. name: 'pipelineA'
  9. }
  10. },
  11. {
  12. set: {
  13. field: 'outer_pipeline_set',
  14. value: 'outer'
  15. }
  16. }
  17. ]
  18. }
  19. )
  20. puts response
  1. const response = await client.ingest.putPipeline({
  2. id: "pipelineB",
  3. description: "outer pipeline",
  4. processors: [
  5. {
  6. pipeline: {
  7. name: "pipelineA",
  8. },
  9. },
  10. {
  11. set: {
  12. field: "outer_pipeline_set",
  13. value: "outer",
  14. },
  15. },
  16. ],
  17. });
  18. console.log(response);
  1. PUT _ingest/pipeline/pipelineB
  2. {
  3. "description" : "outer pipeline",
  4. "processors" : [
  5. {
  6. "pipeline" : {
  7. "name": "pipelineA"
  8. }
  9. },
  10. {
  11. "set" : {
  12. "field": "outer_pipeline_set",
  13. "value": "outer"
  14. }
  15. }
  16. ]
  17. }

Now indexing a document while applying the outer pipeline will see the inner pipeline executed from the outer pipeline:

  1. resp = client.index(
  2. index="my-index-000001",
  3. id="1",
  4. pipeline="pipelineB",
  5. document={
  6. "field": "value"
  7. },
  8. )
  9. print(resp)
  1. response = client.index(
  2. index: 'my-index-000001',
  3. id: 1,
  4. pipeline: 'pipelineB',
  5. body: {
  6. field: 'value'
  7. }
  8. )
  9. puts response
  1. const response = await client.index({
  2. index: "my-index-000001",
  3. id: 1,
  4. pipeline: "pipelineB",
  5. document: {
  6. field: "value",
  7. },
  8. });
  9. console.log(response);
  1. PUT /my-index-000001/_doc/1?pipeline=pipelineB
  2. {
  3. "field": "value"
  4. }

Response from the index request:

  1. {
  2. "_index": "my-index-000001",
  3. "_id": "1",
  4. "_version": 1,
  5. "result": "created",
  6. "_shards": {
  7. "total": 2,
  8. "successful": 1,
  9. "failed": 0
  10. },
  11. "_seq_no": 66,
  12. "_primary_term": 1
  13. }

Indexed document:

  1. {
  2. "field": "value",
  3. "inner_pipeline_set": "inner",
  4. "outer_pipeline_set": "outer"
  5. }