Amazon DynamoDB Sink
The DynamoDB sink writes to Amazon DynamoDB using the AWS v2 SDK for Java. Follow the instructions from the Amazon DynamoDB Developer Guide to setup a table.
To use the connector, add the following Maven dependency to your project:
There is no connector (yet) available for Flink version 1.19.
Java
Properties sinkProperties = new Properties();
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
// Optional, provide via alternative routes e.g. environment variables
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter();
DynamoDbSink<String> dynamoDbSink =
DynamoDbSink.<InputType>builder()
.setDynamoDbProperties(sinkProperties) // Required
.setTableName("my-dynamodb-table") // Required
.setElementConverter(elementConverter) // Required
.setOverwriteByPartitionKeys(singletonList("key")) // Optional
.setFailOnError(false) // Optional
.setMaxBatchSize(25) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.build();
flinkStream.sinkTo(dynamoDbSink);
Scala
val sinkProperties = new Properties()
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
// Optional, provide via alternative routes e.g. environment variables
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
val elementConverter = new CustomElementConverter();
val dynamoDbSink =
DynamoDbSink.<InputType>builder()
.setDynamoDbProperties(sinkProperties) // Required
.setTableName("my-dynamodb-table") // Required
.setElementConverter(elementConverter) // Required
.setOverwriteByPartitionKeys(singletonList("key")) // Optional
.setFailOnError(false) // Optional
.setMaxBatchSize(25) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.build()
flinkStream.sinkTo(dynamoDbSink)
Configurations
Flink’s DynamoDB sink is created by using the static builder DynamoDBSink.<InputType>builder()
.
- setDynamoDbProperties(Properties sinkProperties)
- Required.
- Supplies credentials, region and other parameters to the DynamoDB client.
- setTableName(String tableName)
- Required.
- Name of the table to sink to.
- setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> elementConverter)
- Required.
- Converts generic records of type
InputType
toDynamoDbWriteRequest
.
- setOverwriteByPartitionKeys(List partitionKeys)
- Optional. Default: [].
- Used to deduplicate write requests within each batch pushed to DynamoDB.
- setFailOnError(boolean failOnError)
- Optional. Default:
false
. - Whether failed requests to write records are treated as fatal exceptions in the sink.
- Optional. Default:
- setMaxBatchSize(int maxBatchSize)
- Optional. Default:
25
. - Maximum size of a batch to write.
- Optional. Default:
- setMaxInFlightRequests(int maxInFlightRequests)
- Optional. Default:
50
. - The maximum number of in flight requests allowed before the sink applies backpressure.
- Optional. Default:
- setMaxBufferedRequests(int maxBufferedRequests)
- Optional. Default:
10_000
. - The maximum number of records that may be buffered in the sink before backpressure is applied.
- Optional. Default:
- setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
- N/A.
- This configuration is not supported, see FLINK-29854.
- setMaxTimeInBufferMS(int maxTimeInBufferMS)
- Optional. Default:
5000
. - The maximum time a record may stay in the sink before being flushed.
- Optional. Default:
- setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
- N/A.
- This configuration is not supported, see FLINK-29854.
- build()
- Constructs and returns the DynamoDB sink.
Element Converter
An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided DynamoDbBeanElementConverter
when you are working with @DynamoDbBean
objects. For more information on supported annotations see here.
A sample application using a custom ElementConverter
can be found here. A sample application using the DynamoDbBeanElementConverter
can be found here.
Using Custom DynamoDB Endpoints
It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS DynamoDB endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT
and AWSConfigConstants.AWS_REGION
properties. The region will be used to sign the endpoint URL.
Java
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
Scala
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")