Creating a search pipeline

Search pipelines are stored in the cluster state. To create a search pipeline, you must configure an ordered list of processors in your OpenSearch cluster. You can have more than one processor of the same type in the pipeline. Each processor has a tag identifier that distinguishes it from the others. Tagging a specific processor can be helpful when debugging error messages, especially if you add multiple processors of the same type.

Example request

The following request creates a search pipeline with a filter_query request processor that uses a term query to return only public messages and a response processor that renames the field message to notification:

  1. PUT /_search/pipeline/my_pipeline
  2. {
  3. "request_processors": [
  4. {
  5. "filter_query" : {
  6. "tag" : "tag1",
  7. "description" : "This processor is going to restrict to publicly visible documents",
  8. "query" : {
  9. "term": {
  10. "visibility": "public"
  11. }
  12. }
  13. }
  14. }
  15. ],
  16. "response_processors": [
  17. {
  18. "rename_field": {
  19. "field": "message",
  20. "target_field": "notification"
  21. }
  22. }
  23. ]
  24. }

copy

Ignoring processor failures

By default, a search pipeline stops if one of its processors fails. If you want the pipeline to continue running when a processor fails, you can set the ignore_failure parameter for that processor to true when creating the pipeline:

  1. "filter_query" : {
  2. "tag" : "tag1",
  3. "description" : "This processor is going to restrict to publicly visible documents",
  4. "ignore_failure": true,
  5. "query" : {
  6. "term": {
  7. "visibility": "public"
  8. }
  9. }
  10. }

If the processor fails, OpenSearch logs the failure and continues to run all remaining processors in the search pipeline. To check whether there were any failures, you can use search pipeline metrics.

Updating a search pipeline

To update a search pipeline dynamically, replace the search pipeline using the Search Pipeline API.

Example request

The following example request upserts my_pipeline by adding a filter_query request processor and a rename_field response processor:

  1. PUT /_search/pipeline/my_pipeline
  2. {
  3. "request_processors": [
  4. {
  5. "filter_query": {
  6. "tag": "tag1",
  7. "description": "This processor returns only publicly visible documents",
  8. "query": {
  9. "term": {
  10. "visibility": "public"
  11. }
  12. }
  13. }
  14. }
  15. ],
  16. "response_processors": [
  17. {
  18. "rename_field": {
  19. "field": "message",
  20. "target_field": "notification"
  21. }
  22. }
  23. ]
  24. }

copy

Search pipeline versions

When creating your pipeline, you can specify a version for it in the version parameter:

  1. PUT _search/pipeline/my_pipeline
  2. {
  3. "version": 1234,
  4. "request_processors": [
  5. {
  6. "script": {
  7. "source": """
  8. if (ctx._source['size'] > 100) {
  9. ctx._source['explain'] = false;
  10. }
  11. """
  12. }
  13. }
  14. ]
  15. }

copy

The version is provided in all subsequent responses to get pipeline requests:

  1. GET _search/pipeline/my_pipeline

The response contains the pipeline version:

Response

  1. {
  2. "my_pipeline": {
  3. "version": 1234,
  4. "request_processors": [
  5. {
  6. "script": {
  7. "source": """
  8. if (ctx._source['size'] > 100) {
  9. ctx._source['explain'] = false;
  10. }
  11. """
  12. }
  13. }
  14. ]
  15. }
  16. }