Pipelines

Data Prepper Pipeline

To use Data Prepper, you define pipelines in a configuration YAML file. Each pipeline is a combination of a source, a buffer, zero or more preppers, and one or more sinks. For example:

  1. simple-sample-pipeline:
  2. workers: 2 # the number of workers
  3. delay: 5000 # in milliseconds, how long workers wait between read attempts
  4. source:
  5. random:
  6. buffer:
  7. bounded_blocking:
  8. buffer_size: 1024 # max number of records the buffer accepts
  9. batch_size: 256 # max number of records the buffer drains after each read
  10. processor:
  11. - string_converter:
  12. upper_case: true
  13. sink:
  14. - stdout:
  • Sources define where your data comes from. In this case, the source is a random UUID generator (random).

  • Buffers store data as it passes through the pipeline.

    By default, Data Prepper uses its one and only buffer, the bounded_blocking buffer, so you can omit this section unless you developed a custom buffer or need to tune the buffer settings.

  • Preppers perform some action on your data: filter, transform, enrich, etc.

    You can have multiple preppers, which run sequentially from top to bottom, not in parallel. The string_converter prepper transform the strings by making them uppercase.

  • Sinks define where your data goes. In this case, the sink is stdout.

Examples

This section provides some pipeline examples that you can use to start creating your own pipelines. For more information, see Data Prepper configuration reference guide.

The Data Prepper repository has several sample applications to help you get started.

Log ingestion pipeline

The following example demonstrates how to use HTTP source and Grok prepper plugins to process unstructured log data.

  1. log-pipeline:
  2. source:
  3. http:
  4. ssl: false
  5. processor:
  6. - grok:
  7. match:
  8. log: [ "%{COMMONAPACHELOG}" ]
  9. sink:
  10. - opensearch:
  11. hosts: [ "https://opensearch:9200" ]
  12. insecure: true
  13. username: admin
  14. password: admin
  15. index: apache_logs

This example uses weak security. We strongly recommend securing all plugins which open external ports in production environments.

Trace Analytics pipeline

The following example demonstrates how to build a pipeline that supports the Trace Analytics OpenSearch Dashboards plugin. This pipeline takes data from the OpenTelemetry Collector and uses two other pipelines as sinks. These two separate pipelines index trace and the service map documents for the dashboard plugin.

  1. entry-pipeline:
  2. delay: "100"
  3. source:
  4. otel_trace_source:
  5. ssl: false
  6. sink:
  7. - pipeline:
  8. name: "raw-pipeline"
  9. - pipeline:
  10. name: "service-map-pipeline"
  11. raw-pipeline:
  12. source:
  13. pipeline:
  14. name: "entry-pipeline"
  15. prepper:
  16. - otel_trace_raw_prepper:
  17. sink:
  18. - opensearch:
  19. hosts: ["https://localhost:9200"]
  20. insecure: true
  21. username: admin
  22. password: admin
  23. trace_analytics_raw: true
  24. service-map-pipeline:
  25. delay: "100"
  26. source:
  27. pipeline:
  28. name: "entry-pipeline"
  29. prepper:
  30. - service_map_stateful:
  31. sink:
  32. - opensearch:
  33. hosts: ["https://localhost:9200"]
  34. insecure: true
  35. username: admin
  36. password: admin
  37. trace_analytics_service_map: true

Migrating from Logstash

Data Prepper supports Logstash configuration files for a limited set of plugins. Simply use the logstash config to run Data Prepper.

  1. docker run --name data-prepper \
  2. -v /full/path/to/logstash.conf:/usr/share/data-prepper/pipelines.conf \
  3. opensearchproject/opensearch-data-prepper:latest

This feature is limited by feature parity of Data Prepper. As of Data Prepper 1.2 release, the following plugins from the Logstash configuration are supported:

  • HTTP Input plugin
  • Grok Filter plugin
  • Elasticsearch Output plugin
  • Amazon Elasticsearch Output plugin

Configure the Data Prepper server

Data Prepper itself provides administrative HTTP endpoints such as /list to list pipelines and /metrics/prometheus to provide Prometheus-compatible metrics data. The port that has these endpoints has a TLS configuration and is specified by a separate YAML file. By default, these endpoints are secured by Data Prepper docker images. We strongly recommend providing your own configuration file for securing production environments. Here is an example data-prepper-config.yaml:

  1. ssl: true
  2. keyStoreFilePath: "/usr/share/data-prepper/keystore.jks"
  3. keyStorePassword: "password"
  4. privateKeyPassword: "other_password"
  5. serverPort: 1234

To configure the Data Prepper server, run Data Prepper with the additional yaml file.

  1. docker run --name data-prepper -v /full/path/to/pipelines.yaml:/usr/share/data-prepper/pipelines.yaml \
  2. /full/path/to/data-prepper-config.yaml:/usr/share/data-prepper/data-prepper-config.yaml \
  3. opensearchproject/opensearch-data-prepper:latest