Amazon Kinesis Data Firehose Sink

The Firehose sink writes to Amazon Kinesis Data Firehose.

Follow the instructions from the Amazon Kinesis Data Firehose Developer Guide to setup a Kinesis Data Firehose delivery stream.

To use the connector, add the following Maven dependency to your project:

There is no connector (yet) available for Flink version 1.20.

In order to use the in PyFlink jobs, the following dependencies are required:

VersionPyFlink JAR
flink-connector-aws-kinesis-firehoseThere is no SQL jar (yet) available for Flink version 1.20.

See Python dependency management for more details on how to use JARs in PyFlink.

The KinesisFirehoseSink uses AWS v2 SDK for Java to write data from a Flink stream into a Firehose delivery stream.

Java

  1. Properties sinkProperties = new Properties();
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  7. KinesisFirehoseSink<String> kdfSink =
  8. KinesisFirehoseSink.<String>builder()
  9. .setFirehoseClientProperties(sinkProperties) // Required
  10. .setSerializationSchema(new SimpleStringSchema()) // Required
  11. .setDeliveryStreamName("your-stream-name") // Required
  12. .setFailOnError(false) // Optional
  13. .setMaxBatchSize(500) // Optional
  14. .setMaxInFlightRequests(50) // Optional
  15. .setMaxBufferedRequests(10_000) // Optional
  16. .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
  17. .setMaxTimeInBufferMS(5000) // Optional
  18. .setMaxRecordSizeInBytes(1000 * 1024) // Optional
  19. .build();
  20. flinkStream.sinkTo(kdfSink);

Scala

  1. val sinkProperties = new Properties()
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  7. val kdfSink =
  8. KinesisFirehoseSink.<String>builder()
  9. .setFirehoseClientProperties(sinkProperties) // Required
  10. .setSerializationSchema(new SimpleStringSchema()) // Required
  11. .setDeliveryStreamName("your-stream-name") // Required
  12. .setFailOnError(false) // Optional
  13. .setMaxBatchSize(500) // Optional
  14. .setMaxInFlightRequests(50) // Optional
  15. .setMaxBufferedRequests(10_000) // Optional
  16. .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
  17. .setMaxTimeInBufferMS(5000) // Optional
  18. .setMaxRecordSizeInBytes(1000 * 1024) // Optional
  19. .build()
  20. flinkStream.sinkTo(kdfSink)

Python

  1. sink_properties = {
  2. # Required
  3. 'aws.region': 'eu-west-1',
  4. # Optional, provide via alternative routes e.g. environment variables
  5. 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
  6. 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
  7. }
  8. kdf_sink = KinesisFirehoseSink.builder() \
  9. .set_firehose_client_properties(sink_properties) \ # Required
  10. .set_serialization_schema(SimpleStringSchema()) \ # Required
  11. .set_delivery_stream_name('your-stream-name') \ # Required
  12. .set_fail_on_error(False) \ # Optional
  13. .set_max_batch_size(500) \ # Optional
  14. .set_max_in_flight_requests(50) \ # Optional
  15. .set_max_buffered_requests(10000) \ # Optional
  16. .set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional
  17. .set_max_time_in_buffer_ms(5000) \ # Optional
  18. .set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional
  19. .build()

Configurations

Flink’s Firehose sink is created by using the static builder KinesisFirehoseSink.<InputType>builder().

  1. setFirehoseClientProperties(Properties sinkProperties)
    • Required.
    • Supplies credentials, region and other parameters to the Firehose client.
  2. setSerializationSchema(SerializationSchema serializationSchema)
    • Required.
    • Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
  3. setDeliveryStreamName(String deliveryStreamName)
    • Required.
    • Name of the delivery stream to sink to.
  4. setFailOnError(boolean failOnError)
    • Optional. Default: false.
    • Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
  5. setMaxBatchSize(int maxBatchSize)
    • Optional. Default: 500.
    • Maximum size of a batch to write to Firehose.
  6. setMaxInFlightRequests(int maxInFlightRequests)
    • Optional. Default: 50.
    • The maximum number of in flight requests allowed before the sink applies backpressure.
  7. setMaxBufferedRequests(int maxBufferedRequests)
    • Optional. Default: 10_000.
    • The maximum number of records that may be buffered in the sink before backpressure is applied.
  8. setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
    • Optional. Default: 4 * 1024 * 1024.
    • The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
  9. setMaxTimeInBufferMS(int maxTimeInBufferMS)
    • Optional. Default: 5000.
    • The maximum time a record may stay in the sink before being flushed.
  10. setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
    • Optional. Default: 1000 * 1024.
    • The maximum record size that the sink will accept, records larger than this will be automatically rejected.
  11. build()
    • Constructs and returns the Firehose sink.

Using Custom Firehose Endpoints

It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS Firehose endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.

To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT and AWSConfigConstants.AWS_REGION properties. The region will be used to sign the endpoint URL.

Java

  1. Properties producerConfig = new Properties();
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");

Scala

  1. val producerConfig = new Properties()
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")

Python

  1. producer_config = {
  2. 'aws.region': 'us-east-1',
  3. 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
  4. 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
  5. 'aws.endpoint': 'http://localhost:4566'
  6. }