s3 source

s3 is a source plugin that reads events from Amazon Simple Storage Service (Amazon S3) objects. It requires an Amazon Simple Queue Service (Amazon SQS) queue that receives S3 Event Notifications. After Amazon SQS is configured, the s3 source receives messages from Amazon SQS. When the SQS message indicates that an S3 object was created, the s3 source loads the S3 objects and then parses them using the configured codec. You can also configure the s3 source to use Amazon S3 Select instead of Data Prepper to parse S3 objects.

IAM permissions

In order to use the s3 source, configure your AWS Identity and Access Management (IAM) permissions to grant Data Prepper access to Amazon S3. You can use a configuration similar to the following JSON configuration:

  1. {
  2. "Version": "2012-10-17",
  3. "Statement": [
  4. {
  5. "Sid": "s3-access",
  6. "Effect": "Allow",
  7. "Action": "s3:GetObject",
  8. "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
  9. },
  10. {
  11. "Sid": "sqs-access",
  12. "Effect": "Allow",
  13. "Action": [
  14. "sqs:DeleteMessage",
  15. "sqs:ReceiveMessage"
  16. ],
  17. "Resource": "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
  18. },
  19. {
  20. "Sid": "kms-access",
  21. "Effect": "Allow",
  22. "Action": "kms:Decrypt",
  23. "Resource": "arn:aws:kms:<YOUR-REGION>:<123456789012>:key/<YOUR-KMS-KEY>"
  24. }
  25. ]
  26. }

If your S3 objects or Amazon SQS queues do not use AWS Key Management Service (AWS KMS), remove the kms:Decrypt permission.

Cross-account S3 access

When Data Prepper fetches data from an S3 bucket, it verifies the ownership of the bucket using the bucket owner condition. By default, Data Prepper expects an S3 bucket to be owned by the same that owns the correlating SQS queue. When no SQS is provided, Data Prepper uses the Amazon Resource Name (ARN) role in the aws configuration.

If you plan to ingest data from multiple S3 buckets but each bucket is associated with a different S3 account, you need to configure Data Prepper to check for cross-account S3 access, according to the following conditions:

  • If all S3 buckets you want data from belong to an account other than that of the SQS queue, set default_bucket_owner to the account ID of the bucket account holder.
  • If your S3 buckets are in multiple accounts, use a bucket_owners map.

In the following example, the SQS queue is owned by account 000000000000. The SQS queue contains data from two S3 buckets: my-bucket-01 and my-bucket-02. Because my-bucket-01 is owned by 123456789012 and my-bucket-02 is owned by 999999999999, the bucket_owners map calls both bucket owners with their account IDs, as shown in the following configuration:

  1. s3:
  2. sqs:
  3. queue_url: "https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue"
  4. bucket_owners:
  5. my-bucket-01: 123456789012
  6. my-bucket-02: 999999999999

You can use both bucket_owners and default_bucket_owner together.

Configuration

You can use the following options to configure the s3 source.

OptionRequiredTypeDescription
notification_typeYesStringMust be sqs.
compressionNoStringThe compression algorithm to apply: none, gzip, or automatic. Default value is none.
codecYesCodecThe codec to apply.
sqsYessqsThe SQS configuration. See sqs for details.
awsYesawsThe AWS configuration. See aws for details.
on_errorNoStringDetermines how to handle errors in Amazon SQS, either retain_messages or delete_messages. retain_messages leaves the message in the Amazon SQS queue and tries to send the message again later. This is recommended for dead-letter queues. delete_messages deletes any failed messages. Default is retain_messages.
buffer_timeoutNoDurationThe amount of time allowed for for writing events to the Data Prepper buffer before timeout occurs. Any events that the Amazon S3 source cannot write to the buffer in this time will be discarded. Default value is 10 seconds.
records_to_accumulateNoIntegerThe number of messages that accumulate before writing to the buffer. Default value is 100.
metadata_root_keyNoStringThe base key for adding S3 metadata to each event. The metadata includes the key and bucket for each S3 object. Defaults to s3/.
default_bucket_ownerNoStringAn AWS account ID to use as the default account when checking bucket ownership.
bucket_ownersNoMapA map of S3 bucket names and their AWS account IDs. When provided, the s3 source validates that the bucket is owned by the account. This allows for the use of buckets from multiple accounts.
disable_bucket_ownership_validationNoBooleanWhen true, the S3 source does not attempt to validate that the bucket is owned by the expected account. By default, this is the same account that owns the Amazon SQS queue. For more information, see bucket ownership. Defaults to false.
acknowledgmentsNoBooleanWhen true, enables s3 sources to receive end-to-end acknowledgments when events are received by OpenSearch sinks.

sqs

The following parameters allow you to configure usage for Amazon SQS in the s3 source plugin.

OptionRequiredTypeDescription
queue_urlYesStringThe URL of the Amazon SQS queue from which messages are received.
maximum_messagesNoIntegerThe maximum number of messages to receive from the Amazon SQS queue in any single request. Default value is 10.
visibility_timeoutNoDurationThe visibility timeout to apply to messages read from the Amazon SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Default value is 30s.
wait_timeNoDurationThe amount of time to wait for long polling on the Amazon SQS API. Default value is 20s.
poll_delayNoDurationA delay to place between reading/processing a batch of Amazon SQS messages and making a subsequent request. Default value is 0s.

aws

OptionRequiredTypeDescription
regionNoStringThe AWS Region to use for credentials. Defaults to standard SDK behavior to determine the Region.
sts_role_arnNoStringThe AWS Security Token Service (AWS STS) role to assume for requests to Amazon SQS and Amazon S3. Defaults to null, which will use the standard SDK behavior for credentials.
aws_sts_header_overridesNoMapA map of header overrides that the IAM role assumes for the sink plugin.

codec

The codec determines how the s3 source parses each S3 object.

newline codec

The newline codec parses each single line as a single log event. This is ideal for most application logs because each event parses per single line. It can also be suitable for S3 objects that have individual JSON objects on each line, which matches well when used with the parse_json processor to parse each line.

Use the following options to configure the newline codec.

OptionRequiredTypeDescription
skip_linesNoIntegerThe number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is 0.
header_destinationNoStringA key value to assign to the header line of the S3 object. If this option is specified, then each event will contain a header_destination field.

json codec

The json codec parses each S3 object as a single JSON object from a JSON array and then creates a Data Prepper log event for each object in the array.

csv codec

The csv codec parses objects in comma-separated value (CSV) format, with each row producing a Data Prepper log event. Use the following options to configure the csv codec.

OptionRequiredTypeDescription
delimiterYesIntegerThe delimiter separating columns. Default is ,.
quote_characterYesStringThe character used as a text qualifier for CSV data. Default is .
headerNoString listThe header containing the column names used to parse CSV data.
detect_headerNoBooleanWhether the first line of the S3 object should be interpreted as a header. Default is true.

Using s3_select with the s3 source

When configuring s3_select to parse S3 objects, use the following options.

OptionRequiredTypeDescription
expressionYes, when using s3_selectStringThe expression used to query the object. Maps directly to the expression property.
expression_typeNoStringThe type of the provided expression. Default value is SQL. Maps directly to the ExpressionType.
input_serializationYes, when using s3_selectStringProvides the S3 Select file format. Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. May be csv, json, or parquet.
compression_typeNoStringSpecifies an object’s compression format. Maps directly to the CompressionType.
csvNocsvProvides the CSV configuration for processing CSV data.
jsonNojsonProvides the JSON configuration for processing JSON data.

csv

Use the following options in conjunction with the csv configuration for s3_select to determine how your parsed CSV file should be formatted.

These options map directly to options available in the S3 Select CSVInput data type.

OptionRequiredTypeDescription
file_header_infoNoStringDescribes the first line of input. Maps directly to the FileHeaderInfo property.
quote_escapeNoStringA single character used for escaping the quotation mark character inside an already escaped value. Maps directly to the QuoteEscapeCharacter property.
commentsNoStringA single character used to indicate that a row should be ignored when the character is present at the start of that row. Maps directly to the Comments property.

json

Use the following option in conjunction with json for s3_select to determine how S3 Select processes the JSON file.

OptionRequiredTypeDescription
typeNoStringThe type of JSON array. May be either DOCUMENT or LINES. Maps directly to the Type property.

Using scan with the s3 source

The following parameters allow you to scan S3 objects. All options can be configured at the bucket level.

OptionRequiredTypeDescription
start_timeNoStringThe time from which to start scanning objects modified after the given start_time. This should follow ISO LocalDateTime format, for example, 023-01-23T10:00:00. If end_time is configured along with start_time, all objects after start_time and before end_time will be processed. start_time and range cannot be used together.
end_timeNoStringThe time after which no objects will be scanned after the given end_time. This should follow ISO LocalDateTime format, for example, 023-01-23T10:00:00. If start_time is configured along with end_time, all objects after start_time and before end_time will be processed. end_time and range cannot be used together.
rangeNoStringThe time range from which objects are scanned from all buckets. Supports ISO_8601 notation strings, such as PT20.345S or PT15M, and notation strings for seconds (60s) and milliseconds (1600ms). start_time and end_time cannot be used with range. Range P12H scans all the objects modified in the last 12 hours from the time pipeline started.
bucketsYesListA list of buckets to scan.
schedulingNoListThe configuration for scheduling periodic scans on all buckets. start_time, end_time and range can not be used if scheduling is configured.

bucket

OptionRequiredTypeDescription
bucketYesMapProvides options for each bucket.

You can configure the following options inside the bucket setting.

OptionRequiredTypeDescription
nameYesStringThe string representing the S3 bucket name to scan.
filterNoFilterProvides the filter configuration.
start_timeNoStringThe time from which to start scanning objects modified after the given start_time. This should follow ISO LocalDateTime format, for example, 023-01-23T10:00:00. If end_time is configured along with start_time, all objects after start_time and before end_time will be processed. start_time and range cannot be used together. This will overwrites the start_time at the scan level.
end_timeNoStringThe time after which no objects will be scanned after the given end_time. This should follow ISO LocalDateTime format, for example, 023-01-23T10:00:00. If start_time is configured along with end_time, all objects after start_time and before end_time will be processed. This overwrites the end_time at the scan level.
rangeNoStringThe time range from which objects are scanned from all buckets. Supports ISO_8601 notation strings, such as PT20.345S or PT15M, and notation strings for seconds (60s) and milliseconds (1600ms). start_time and end_time cannot be used with range. Range P12H scans all the objects modified in the last 12 hours from the time pipeline started. This overwrites the range at the scan level.

filter

Use the following options inside the filter configuration.

OptionRequiredTypeDescription
include_prefixNoListA list of S3 key prefix strings included in the scan. By default, all the objects in a bucket are included.
exclude_suffixNoListA list of S3 key suffix strings excluded from the scan. By default, no objects in a bucket are excluded.

scheduling

Option | Required | Type | Description :— | :— | :— | :— interval | Yes | String | Indicates the minimum interval between each scan. The next scan in the interval will start after the interval duration from the last scan ends and when all the objects from the previous scan are processed. Supports ISO_8601 notation strings, such as PT20.345S or PT15M, and notation strings for seconds (60s) and milliseconds (1600ms). count | No | Integer | Specifies how many times a bucket will be scanned. Defaults to Integer.MAX_VALUE.

Metrics

The s3 source includes the following metrics.

Counters

  • s3ObjectsFailed: The number of S3 objects that the s3 source failed to read.
  • s3ObjectsNotFound: The number of S3 objects that the s3 source failed to read due to an S3 “Not Found” error. These are also counted toward s3ObjectsFailed.
  • s3ObjectsAccessDenied: The number of S3 objects that the s3 source failed to read due to an “Access Denied” or “Forbidden” error. These are also counted toward s3ObjectsFailed.
  • s3ObjectsSucceeded: The number of S3 objects that the s3 source successfully read.
  • sqsMessagesReceived: The number of Amazon SQS messages received from the queue by the s3 source.
  • sqsMessagesDeleted: The number of Amazon SQS messages deleted from the queue by the s3 source.
  • sqsMessagesFailed: The number of Amazon SQS messages that the s3 source failed to parse.

Timers

  • s3ObjectReadTimeElapsed: Measures the amount of time the s3 source takes to perform a request to GET an S3 object, parse it, and write events to the buffer.
  • sqsMessageDelay: Measures the time elapsed from when S3 creates an object to when it is fully parsed.

Distribution summaries

  • s3ObjectSizeBytes: Measures the size of S3 objects as reported by the S3 Content-Length. For compressed objects, this is the compressed size.
  • s3ObjectProcessedBytes: Measures the bytes processed by the s3 source for a given object. For compressed objects, this is the uncompressed size.
  • s3ObjectsEvents: Measures the number of events (sometimes called records) produced by an S3 object.

Example: Uncompressed logs

The following pipeline.yaml file shows the minimum configuration for reading uncompressed newline-delimited logs:

  1. source:
  2. s3:
  3. notification_type: sqs
  4. codec:
  5. newline:
  6. compression: none
  7. sqs:
  8. queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
  9. aws:
  10. region: "us-east-1"
  11. sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"