Source coordination

Source coordination is the concept of coordinating and distributing work between Data Prepper data sources in a multi-node environment. Some data sources, such as Amazon Kinesis or Amazon Simple Queue Service (Amazon SQS), handle coordination natively. Other data sources, such as OpenSearch, Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and JDBC/ODBC, do not support source coordination.

Data Prepper source coordination decides which partition of work is performed by each node in the Data Prepper cluster and prevents duplicate partitions of work.

Inspired by the Kinesis Client Library, Data Prepper utilizes a distributed store in the form of a lease to handle the distribution and deduplication of work.

Formatting partitions

Source coordination separates sources into “partitions of work.” For example, an S3 object would be a partition of work for Amazon S3, or an OpenSearch index would be a partition of work for OpenSearch.

Data Prepper takes each partition of work that is chosen by the source and creates corresponding items in the distributed store that Data Prepper uses for source coordination. Each of these items has the following standard format, which can be extended by the distributed store implementation.

ValueTypeDescription
sourceIdentifierStringThe identifier for which the Data Prepper pipeline works on this partition. By default, the sourceIdentifier is prefixed by the sub-pipeline name, but an additional prefix can be configured with partition_prefix in your data-prepper-config.yaml file.
sourcePartitionKeyStringThe identifier for the partition of work associated with this item. For example, for an s3 source with scan capabilities, this identifier is the S3 bucket’s objectKey combination.
partitionOwnerStringAn identifier for the node that actively owns and is working on this partition. This ID contains the hostname of the node but is null when this partition is not owned.
partitionProgressStateStringA JSON string object representing the progress made on a partition of work or any additional metadata that may be needed by the source in the case of another node resuming where the last node stopped during a crash.
partitionOwnershipTimeoutTimestampWhenever a Data Prepper node acquires a partition, a 10-minute timeout is given to the owner of the partition to handle the event of a node crashing. The ownership is renewed with another 10 minutes when the owner saves the state of the partition.
sourcePartitionStatusEnumRepresents the current state of the partition: ASSIGNED means the partition is currently being processed, UNASSIGNED means the partition is waiting to be processed, CLOSED means the partition is waiting to be processed at a later date, and COMPLETED means the partition has already been processed.
reOpenAtTimestampRepresents the time at which CLOSED partitions reopen and are considered to be available for processing. Only applies to CLOSED partitions.
closedCountLongTracks how many times the partition has been marked as CLOSED.

Acquiring partitions

Partitions are acquired in the order that they are returned in the List<PartitionIdentifer> provided by the source. When a node attempts to acquire a partition, Data Prepper performs the following steps:

  1. Data Prepper queries the ASSIGNED partitions to check whether any ASSIGNED partitions have expired partition owners. This is intended to assign priority to partitions that have had nodes crash in the middle of processing, which can allow for using a partition state that may be time sensitive.
  2. After querying ASSIGNED partitions, Data Prepper queries the CLOSED partitions to determine whether any of the partition’s reOpenAt timestamps have been reached.
  3. If there are no ASSIGNED or CLOSED partitions available, then Data Prepper queries the UNASSIGNED partitions until on of these partitions is ASSIGNED.

If this flow occurs and no partition is acquired by the node, then the partition supplier function provided in the getNextPartition method of SourceCoordinator will create new partitions. After the supplier function completes, Data Prepper again queries the partitions for ASSIGNED, CLOSED, and UNASSIGNED.

Global state

Any function that is passed to the getNextPartition method creates new partitions with a global state of Map<String, Object>. This state is shared between all of the nodes in the cluster and will only be run by a single node at a time, as determined by the source.

Configuration

The following table provide optional configuration values for source_coordination.

ValueTypeDescription
partition_prefixStringA prefix to the sourceIdentifier used to differentiate between Data Prepper clusters that share the same distributed store.
storeObjectThe object that comprises the configuration for the store to be used, where the key is the name of the store, such as in_memory or dynamodb, and the value is any configuration available on that store type.

Supported stores

As of Data Prepper 2.4, only in_memory and dynamodb stores are supported:

  • The in_memory store is the default when no source_coordination settings are configured in the data-prepper-config.yaml file and should only be used for single-node configurations.
  • The dynamodb store is used for multi-node Data Prepper environments. The dynamodb store can be shared between one or more Data Prepper clusters that need to utilize source coordination.

DynamoDB store

Data Prepper will attempt to create the dynamodb table on startup unless the skip_table_creation flag is configured to true. Optionally, you can configure the time-to-live (ttl) on the table, which results in the store cleaning up items over time. Some sources rely on source coordination for the deduplication of data, so be sure to configure a large enough ttl for the pipeline duration.

If ttl is not configured on the table, any items no longer needed in the table must be cleaned manually.

The following shows the full set of permissions needed for Data Prepper to create the table, enable ttl, and interact with the table:

  1. {
  2. "Sid": "ReadWriteSourceCoordinationDynamoStore",
  3. "Effect": "Allow",
  4. "Action": [
  5. "dynamodb:DescribeTimeToLive",
  6. "dynamodb:UpdateTimeToLive",
  7. "dynamodb:DescribeTable",
  8. "dynamodb:CreateTable",
  9. "dynamodb:GetItem",
  10. "dynamodb:PutItem",
  11. "dynamodb:Query"
  12. ],
  13. "Resource": [
  14. "arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}",
  15. "arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}/index/source-status"
  16. ]
  17. }
ValueRequiredTypeDescription
table_nameYesStringThe name of the table to be used for source coordination.
regionYesStringThe region of the DynamoDB table.
sts_role_arnNoStringThe sts role that contains the table permissions. Uses default credentials when not provided.
sts_external_idNoStringThe external ID used in the API call to assume the sts_role_arn.
skip_table_creationNoBooleanIf set to true when using an existing store, the attempt to create the store is skipped. Default is false.
provisioned_write_capacity_unitsNoIntegerThe number of write capacity units to configure on the table. Default is 10.
provisioned_read_capacity_unitsNoIntegerThe number of read capacity units to configure on the table. Default is 10.
ttlDurationOptional. The duration of the TTL for the items in the table. The TTL is extended by this duration when an update is made to the item. Defaults to no TTL being used on the table. 

The following example shows a dynamodb store:

  1. source_coordination:
  2. store:
  3. dynamodb:
  4. table_name: "DataPrepperSourceCoordinationStore"
  5. region: "us-east-1"
  6. sts_role_arn: "arn:aws:iam::##########:role/SourceCoordinationTableRole"
  7. ttl: "P7D"
  8. skip_table_creation: true

In-memory store (default)

The following example shows an in_memory store, which is best used with a single-node cluster:

  1. source_coordination:
  2. store:
  3. in_memory:

Metrics

Source coordination metrics are interpreted differently depending on which source is configured. The format of a source coordination metric is <sub-pipeline-name>_source_coordinator_<metric-name>. You can use the sub-pipeline name to identify the source for these metrics because each sub-pipeline is unique to each source.

Progress metrics

The following are metrics related to partition progress:

  • partitionsCreatedCount: The number of partition items that have been created. For an S3 scan, this is the number of objects that have had partitions created for them.
  • partitionsCompleted: The number of partitions that have been fully processed and marked as COMPLETED. For an S3 scan, this is the number of objects that have been processed.
  • noPartitionsAcquired: The number of times that a node has attempted to acquire a partition on which to perform work but has found no available partitions in the store. Use this to indicate that there is no more data coming into the source.
  • partitionsAcquired: The number of partitions that have been acquired by nodes on which to perform work. In non-error scenarios, this should be equal to the number of partitions created.
  • partitionsClosed: The number of partitions that have been marked as CLOSED. This is only applicable to sources that use the CLOSED functionality.

The following are metrics related to partition errors:

  • partitionNotFoundErrors: Indicates that a partition item that is actively owned by a node does not have a corresponding store item. This should only occur if an item in the table has been manually deleted.
  • partitionNotOwnedErrors: Indicates that a node that owns a partition has lost ownership due to the partition ownership timeout expiring. Unless the source is able to checkpoint the partition with saveState, this error results in duplicate item processing.
  • partitionUpdateErrors: The number of errors that were received when an update to the store for this partition item failed. Is prefixed with either saveState, close, or complete to indicate which update action is failing.