Amazon DynamoDB Sink

The DynamoDB sink writes to Amazon DynamoDB using the AWS v2 SDK for Java. Follow the instructions from the Amazon DynamoDB Developer Guide to setup a table.

To use the connector, add the following Maven dependency to your project:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-dynamodb</artifactId>
  4. <version>4.2.0-1.18</version>
  5. </dependency>

Copied to clipboard!

Java

  1. Properties sinkProperties = new Properties();
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  7. ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter();
  8. DynamoDbSink<String> dynamoDbSink =
  9. DynamoDbSink.<InputType>builder()
  10. .setDynamoDbProperties(sinkProperties) // Required
  11. .setTableName("my-dynamodb-table") // Required
  12. .setElementConverter(elementConverter) // Required
  13. .setOverwriteByPartitionKeys(singletonList("key")) // Optional
  14. .setFailOnError(false) // Optional
  15. .setMaxBatchSize(25) // Optional
  16. .setMaxInFlightRequests(50) // Optional
  17. .setMaxBufferedRequests(10_000) // Optional
  18. .setMaxTimeInBufferMS(5000) // Optional
  19. .build();
  20. flinkStream.sinkTo(dynamoDbSink);

Scala

  1. val sinkProperties = new Properties()
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  7. val elementConverter = new CustomElementConverter();
  8. val dynamoDbSink =
  9. DynamoDbSink.<InputType>builder()
  10. .setDynamoDbProperties(sinkProperties) // Required
  11. .setTableName("my-dynamodb-table") // Required
  12. .setElementConverter(elementConverter) // Required
  13. .setOverwriteByPartitionKeys(singletonList("key")) // Optional
  14. .setFailOnError(false) // Optional
  15. .setMaxBatchSize(25) // Optional
  16. .setMaxInFlightRequests(50) // Optional
  17. .setMaxBufferedRequests(10_000) // Optional
  18. .setMaxTimeInBufferMS(5000) // Optional
  19. .build()
  20. flinkStream.sinkTo(dynamoDbSink)

Configurations

Flink’s DynamoDB sink is created by using the static builder DynamoDBSink.<InputType>builder().

  1. setDynamoDbProperties(Properties sinkProperties)
    • Required.
    • Supplies credentials, region and other parameters to the DynamoDB client.
  2. setTableName(String tableName)
    • Required.
    • Name of the table to sink to.
  3. setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> elementConverter)
    • Required.
    • Converts generic records of type InputType to DynamoDbWriteRequest.
  4. setOverwriteByPartitionKeys(List partitionKeys)
    • Optional. Default: [].
    • Used to deduplicate write requests within each batch pushed to DynamoDB.
  5. setFailOnError(boolean failOnError)
    • Optional. Default: false.
    • Whether failed requests to write records are treated as fatal exceptions in the sink.
  6. setMaxBatchSize(int maxBatchSize)
    • Optional. Default: 25.
    • Maximum size of a batch to write.
  7. setMaxInFlightRequests(int maxInFlightRequests)
    • Optional. Default: 50.
    • The maximum number of in flight requests allowed before the sink applies backpressure.
  8. setMaxBufferedRequests(int maxBufferedRequests)
    • Optional. Default: 10_000.
    • The maximum number of records that may be buffered in the sink before backpressure is applied.
  9. setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
    • N/A.
    • This configuration is not supported, see FLINK-29854.
  10. setMaxTimeInBufferMS(int maxTimeInBufferMS)
    • Optional. Default: 5000.
    • The maximum time a record may stay in the sink before being flushed.
  11. setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
    • N/A.
    • This configuration is not supported, see FLINK-29854.
  12. build()
    • Constructs and returns the DynamoDB sink.

Element Converter

An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided DynamoDbBeanElementConverter when you are working with @DynamoDbBean objects. For more information on supported annotations see here.

A sample application using a custom ElementConverter can be found here. A sample application using the DynamoDbBeanElementConverter can be found here.

Using Custom DynamoDB Endpoints

It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS DynamoDB endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.

To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT and AWSConfigConstants.AWS_REGION properties. The region will be used to sign the endpoint URL.

Java

  1. Properties producerConfig = new Properties();
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");

Scala

  1. val producerConfig = new Properties()
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")