Amazon Kinesis Data Firehose Sink

The Firehose sink writes to Amazon Kinesis Data Firehose.

Follow the instructions from the Amazon Kinesis Data Firehose Developer Guide to setup a Kinesis Data Firehose delivery stream.

To use the connector, add the following Maven dependency to your project:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
  4. <version>1.19.0</version>
  5. </dependency>

Copied to clipboard!

There is no connector (yet) available for Flink version 1.19.

为了在 PyFlink 作业中使用 ,需要添加下列依赖:

VersionPyFlink JAR
flink-connector-aws-kinesis-firehoseThere is no SQL jar (yet) available for Flink version 1.19.

在 PyFlink 中如何添加 JAR 包依赖请参考 Python 依赖管理

The KinesisFirehoseSink uses AWS v2 SDK for Java to write data from a Flink stream into a Firehose delivery stream.

Java

  1. Properties sinkProperties = new Properties();
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  7. KinesisFirehoseSink<String> kdfSink =
  8. KinesisFirehoseSink.<String>builder()
  9. .setFirehoseClientProperties(sinkProperties) // Required
  10. .setSerializationSchema(new SimpleStringSchema()) // Required
  11. .setDeliveryStreamName("your-stream-name") // Required
  12. .setFailOnError(false) // Optional
  13. .setMaxBatchSize(500) // Optional
  14. .setMaxInFlightRequests(50) // Optional
  15. .setMaxBufferedRequests(10_000) // Optional
  16. .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
  17. .setMaxTimeInBufferMS(5000) // Optional
  18. .setMaxRecordSizeInBytes(1000 * 1024) // Optional
  19. .build();
  20. flinkStream.sinkTo(kdfSink);

Scala

  1. val sinkProperties = new Properties()
  2. // Required
  3. sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
  4. // Optional, provide via alternative routes e.g. environment variables
  5. sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  6. sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  7. val kdfSink =
  8. KinesisFirehoseSink.<String>builder()
  9. .setFirehoseClientProperties(sinkProperties) // Required
  10. .setSerializationSchema(new SimpleStringSchema()) // Required
  11. .setDeliveryStreamName("your-stream-name") // Required
  12. .setFailOnError(false) // Optional
  13. .setMaxBatchSize(500) // Optional
  14. .setMaxInFlightRequests(50) // Optional
  15. .setMaxBufferedRequests(10_000) // Optional
  16. .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
  17. .setMaxTimeInBufferMS(5000) // Optional
  18. .setMaxRecordSizeInBytes(1000 * 1024) // Optional
  19. .build()
  20. flinkStream.sinkTo(kdfSink)

Python

  1. sink_properties = {
  2. # Required
  3. 'aws.region': 'eu-west-1',
  4. # Optional, provide via alternative routes e.g. environment variables
  5. 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
  6. 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
  7. }
  8. kdf_sink = KinesisFirehoseSink.builder() \
  9. .set_firehose_client_properties(sink_properties) \ # Required
  10. .set_serialization_schema(SimpleStringSchema()) \ # Required
  11. .set_delivery_stream_name('your-stream-name') \ # Required
  12. .set_fail_on_error(False) \ # Optional
  13. .set_max_batch_size(500) \ # Optional
  14. .set_max_in_flight_requests(50) \ # Optional
  15. .set_max_buffered_requests(10000) \ # Optional
  16. .set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional
  17. .set_max_time_in_buffer_ms(5000) \ # Optional
  18. .set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional
  19. .build()

Configurations

Flink’s Firehose sink is created by using the static builder KinesisFirehoseSink.<InputType>builder().

  1. setFirehoseClientProperties(Properties sinkProperties)
    • Required.
    • Supplies credentials, region and other parameters to the Firehose client.
  2. setSerializationSchema(SerializationSchema serializationSchema)
    • Required.
    • Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
  3. setDeliveryStreamName(String deliveryStreamName)
    • Required.
    • Name of the delivery stream to sink to.
  4. setFailOnError(boolean failOnError)
    • Optional. Default: false.
    • Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
  5. setMaxBatchSize(int maxBatchSize)
    • Optional. Default: 500.
    • Maximum size of a batch to write to Firehose.
  6. setMaxInFlightRequests(int maxInFlightRequests)
    • Optional. Default: 50.
    • The maximum number of in flight requests allowed before the sink applies backpressure.
  7. setMaxBufferedRequests(int maxBufferedRequests)
    • Optional. Default: 10_000.
    • The maximum number of records that may be buffered in the sink before backpressure is applied.
  8. setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
    • Optional. Default: 4 * 1024 * 1024.
    • The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
  9. setMaxTimeInBufferMS(int maxTimeInBufferMS)
    • Optional. Default: 5000.
    • The maximum time a record may stay in the sink before being flushed.
  10. setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
    • Optional. Default: 1000 * 1024.
    • The maximum record size that the sink will accept, records larger than this will be automatically rejected.
  11. build()
    • Constructs and returns the Firehose sink.

Using Custom Firehose Endpoints

It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS Firehose endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.

To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT and AWSConfigConstants.AWS_REGION properties. The region will be used to sign the endpoint URL.

Java

  1. Properties producerConfig = new Properties();
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");

Scala

  1. val producerConfig = new Properties()
  2. producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  3. producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
  4. producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
  5. producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")

Python

  1. producer_config = {
  2. 'aws.region': 'us-east-1',
  3. 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
  4. 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
  5. 'aws.endpoint': 'http://localhost:4566'
  6. }