亚马逊AWS Kinesis Streams连接器
Kinesis连接器提供对Amazon AWS Kinesis Streams的访问。
要使用连接器,请将以下Maven依赖项添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
它flink-connector-kinesis_2.11
依赖于根据亚马逊软件许可(ASL)许可的代码。链接到flink-connector-kinesis将在您的应用程序中包含ASL许可代码。
flink-connector-kinesis_2.11
由于许可问题,该工件未作为Flink版本的一部分部署到Maven中心。因此,您需要自己从源构建连接器。
下载Flink源代码或从git存储库中检出。然后,使用以下Maven命令构建模块:
mvn clean install -Pinclude-kinesis -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests
注意对于Flink版本1.4.2及更低版本,AWS Kinesis Streams不再支持Kinesis连接器中默认使用的KPL客户端版本KPL 0.12.5(请参阅此处)。这意味着在构建Kinesis连接器时,您需要指定更高版本的KPL客户端(大于0.12.6)才能使Flink Kinesis Producer工作。您可以通过aws.kinesis-kpl.version
属性指定首选版本来执行此 算子操作,如下所示:
mvn clean install -Pinclude-kinesis -Daws.kinesis-kpl.version=0.12.6 -DskipTests
流连接器不是二进制分发的一部分。在此处了解如何与群集执行链接。
使用Amazon Kinesis Streams服务
按照Amazon Kinesis Streams开发人员指南中的说明设置Kinesis流。确保创建适当的IAM策略和用户以读取/写入Kinesis流。
Kinesis消费者
这FlinkKinesisConsumer
是一个完全一次并行的流数据源,可以在同一个AWS服务区域内订阅多个AWS Kinesis流,并且可以在作业运行时透明地处理流的重新分片。消费者的每个子任务负责从多个Kinesis分片中获取数据记录。每个子任务获取的分片数将随着Kinesis关闭和创建分片而改变。
在使用Kinesis流中的数据之前,请确保在AWS仪表板中创建状态为“ACTIVE”的所有流。
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
val consumerConfig = new Properties()
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val env = StreamExecutionEnvironment.getEnvironment
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, consumerConfig))
以上是使用消费者的简单示例。消费者的配置提供了一个java.util.Properties
实例,其配置键可以在其中找到ConsumerConfigConstants
。该示例演示了在AWS区域“us-east-1”中使用单个Kinesis流。的AWS凭据使用其中AWS访问KeysID和密钥访问Keys在配置直接供给的基本方法提供(其他选项设定ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER
到ENV_VAR
,SYS_PROP
,PROFILE
,ASSUME_ROLE
,和AUTO
)。此外,数据正在从Kinesis流中的最新位置消耗(另一个选项将设置ConsumerConfigConstants.STREAM_INITIAL_POSITION
为TRIM_HORIZON
,这使得消费者可以从最早的记录开始读取Kinesis流)。
可以在中找到消费者的其他可选配置键ConsumerConfigConstants
。
请注意,Flink Kinesis Consumer源的配置并行度可以完全独立于Kinesis流中的分片总数。当分片数量大于消费者的并行度时,则每个消费者子任务可以订阅多个分片; 否则,如果分片的数量小于消费者的并行性,那么一些消费者子任务将只是空闲并等待它被分配新的分片(即,当重新分配流以增加更高配置的Kinesis服务的分片数量时吞吐量)。
另请注意,当分片ID不连续时(由于Kinesis中的动态重新分片),分片到子任务的分配可能不是最佳的。对于分配中的偏差导致显着的不平衡消耗的情况,KinesisShardAssigner
可以在消费者上设置自定义实现。
配置起始位置
Flink Kinesis Consumer目前提供以下选项来配置从哪里开始读取Kinesis流,只需ConsumerConfigConstants.STREAM_INITIAL_POSITION
在提供的配置属性中设置为以下值之一(选项的命名与AWS Kinesis Streams服务使用的命名相同) :
LATEST
:从最新记录开始读取所有流的所有分片。TRIM_HORIZON
:从最早的记录开始读取所有流的所有分片(Kinesis可根据保存设置修剪数据)。AT_TIMESTAMP
:从指定的时间戳开始读取所有流的所有分片。还必须在配置属性中指定时间戳ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP
,方法是在以下日期模式之一中提供值:- 一个非负的double值,表示自Unix纪元以来经过的秒数(例如
1459799926.480
)。 - 用户定义的模式,它是
SimpleDateFormat
由提供的有效模式ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT
。如果ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT
未定义,则默认模式将是yyyy-MM-dd'T'HH:mm:ss.SSSXXX
(例如,时间戳值为2016-04-04
,模式yyyy-MM-dd
由用户给出,或者时间戳值2016-04-04T19:58:46.480-00:00
没有给定模式)。
- 一个非负的double值,表示自Unix纪元以来经过的秒数(例如
完全一次用户定义的状态更新语义的容错
启用Flink的检查点后,Flink Kinesis Consumer将消耗Kinesis流中的分片记录,并定期检查每个分片的进度。如果作业失败,Flink会将流式程序恢复到最新完整检查点的状态,并从检查点中存储的进度开始重新使用Kinesis分片中的记录。
因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。
要使用容错Kinesis使用者,需要在运行环境中启用拓扑检查点:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
另请注意,如果有足够的处理插槽可用于重新启动拓扑,则Flink只能重新启动拓扑。因此,如果拓扑由于丢失了TaskManager而失败,则之后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。
消费记录的事件时间
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果流拓扑选择将事件时间概念用于记录时间戳,则默认情况下将使用近似到达时间戳。一旦Kinesis成功接收并通过流存储,该时间戳附加到记录上。请注意,此时间戳通常称为Kinesis服务器端时间戳,并且不保证准确性或顺序正确性(即,时间戳可能并不总是在升序)。
用户可以选择覆盖此默认与自定义的时间戳,如所描述这里,或者使用一个从预定义的人。执行此 算子操作后,可以通过以下方式将其传递给使用者:
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
线程模型
Flink Kinesis Consumer使用多个线程进行分片发现和数据消费。
对于分片发现,每个并行使用者子任务将具有单个线程,该线程不断向Kinesis查询分片信息,即使子任务最初没有从消费者启动时读取的分片。换句话说,如果消费者以10的并行度运行,则总共有10个线程不断查询Kinesis,无论订阅流中的分片总数如何。
对于数据消耗,将创建单个线程以使用每个发现的分片。当由于流重新分片而关闭它负责消息的分片时,线程将终止。换句话说,每个打开的分片总会有一个线程。
内部使用的Kinesis API
Flink Kinesis Consumer在内部使用AWS Java SDK来调用Kinesis API以进行分片发现和数据消费。由于亚马逊对API上的Kinesis Streams的服务限制,消费者将与用户可能正在运行的其他非Flink消费应用程序竞争。下面是消费者调用的API列表,其中描述了消费者如何使用API,以及有关如何处理Flink Kinesis Consumer可能因这些服务限制而可能出现的任何错误或警告的信息。
DescribeStream:每个并行使用者子任务中的单个线程不断调用它,以便在流重新分片时发现任何新的分片。默认情况下,使用者以10秒的间隔执行分片发现,并将无限期地重试,直到从Kinesis获得结果。如果这会干扰其他非Flink消费应用程序,则用户可以通过
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS
在提供的配置属性中设置值来减慢调用此API的使用者的速度。这会将发现间隔设置为不同的值。请注意,此设置直接影响发现新分片并开始使用它的最大延迟,因为在间隔期间不会发现分片。GetShardIterator:每个启动分片消耗线程时只调用一次,如果Kinesis抱怨API的事务限制已超过,则会重试,最多默认为3次尝试。请注意,由于此API的速率限制是每个分片(不是每个流),因此消费者本身不应超过限制。通常,如果发生这种情况,用户可以尝试减慢调用此API的任何其他非Flink消费应用程序的速度,或者通过
ConsumerConfigConstants.SHARDGETITERATOR*
在提供的配置属性中设置前缀的键来修改此消费者中此API调用的重试行为。GetRecords:每个分片消费线程不断调用这个来从Kinesis获取记录。当分片具有多个并发使用者时(当运行任何其他非Flink消费应用程序时),可能会超出每个分片速率限制。默认情况下,在每次调用此API时,如果Kinesis抱怨API的数据大小/事务限制已超过,则消费者将重试,最多默认值为3次。用户可以尝试减慢其他非Flink消费应用程序的速度,也可以通过设置
ConsumerConfigConstants.SHARDGETRECORDS_MAX
和来调整消费者的吞吐量ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
提供的配置属性中的键。设置前者会调整每个消费线程在每次调用时尝试从分片中获取的最大记录数(默认值为10,000),而后者会修改每次获取之间的休眠间隔(默认值为200)。调用此API时,消费者的重试行为也可以使用前缀为的其他键进行修改ConsumerConfigConstants.SHARD_GETRECORDS
*
。
Kinesis制片人
该FlinkKinesisProducer
用途的Kinesis生产者库(KPL)把数据从Flink流进入室壁运动流。
请注意,生产者不参与Flink的检查点,并且不提供一次性处理保证。此外,Kinesis生产商不保证记录是为了分片而写的(有关详细信息,请参见此处和此处)。
如果发生故障或重新分片,数据将再次写入Kinesis,导致重复。此行为通常称为“至少一次”语义。
要将数据放入Kinesis流,请确保在AWS仪表板中将流标记为“ACTIVE”。
要使监视工作,访问流的用户需要访问CloudWatch服务。
Properties producerConfig = new Properties();
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");
// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false");
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");
DataStream<String> simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
val producerConfig = new Properties()
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
// Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295")
producerConfig.put("CollectionMaxCount", "1000")
producerConfig.put("RecordTtl", "30000")
producerConfig.put("RequestTimeout", "6000")
producerConfig.put("ThreadPoolSize", "15")
// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false")
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST")
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
kinesis.setFailOnError(true)
kinesis.setDefaultStream("kinesis_stream_name")
kinesis.setDefaultPartition("0")
val simpleStringStream = ...
simpleStringStream.addSink(kinesis)
以上是使用生产者的简单示例。要初始化FlinkKinesisProducer
,用户需要传递AWS_REGION
,AWS_ACCESS_KEY_ID
以及AWS_SECRET_ACCESS_KEY
通过一个java.util.Properties
实例。用户还可以将KPL的配置作为可选参数传递,以自定义KPL底层FlinkKinesisProducer
。可在此处找到KPL配置和说明的完整列表。该示例演示了在AWS区域“us-east-1”中生成单个Kinesis流。
如果用户未指定任何KPL配置和值,FlinkKinesisProducer
则将使用KPL的默认配置值,但RateLimit
。RateLimit
限制分片的最大允许放置率,作为后台限制的百分比。KPL的默认值是150,但它会使KPL RateLimitExceededException
过于频繁而导致Flink下沉。因此将FlinkKinesisProducer
KPL的默认值覆盖为100。
而不是a SerializationSchema
,它也支持a KinesisSerializationSchema
。在KinesisSerializationSchema
允许将数据发送到多个流。这是使用该KinesisSerializationSchema.getTargetStream(T element)
方法完成的。返回null
将指示生产者将数据元写入默认流。否则,使用返回的流名称。
线程模型
从Flink 1.4.0开始,FlinkKinesisProducer
将其默认底层KPL从单线程每请求模式切换到线程池模式。线程池模式下的KPL使用队列和线程池来执行对Kinesis的请求。这限制了KPL本机进程可能创建的线程数,因此大大降低了CPU利用率并提高了效率。因此,我们强烈建议Flink用户使用线程池模型。默认的线程池大小是10
。用户可以java.util.Properties
使用key 设置实例中的池大小,ThreadPoolSize
如上例所示。
用户仍然可以通过设置一个键-值对切换回一个线程每个请求模式ThreadingModel
和PER_REQUEST
在java.util.Properties
,如图中上面的例子中注释的代码。
背压
默认情况下,FlinkKinesisProducer
不反压。相反,由于每个分片每秒1 MB的速率限制而无法发送的记录在无界队列中缓冲,并在RecordTtl
到期时丢弃。
为避免数据丢失,您可以通过限制内部队列的大小来启用反压:
// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);
值queueLimit
取决于预期的记录大小。要选择一个好的值,请考虑Kinesis的速率限制为每个碎片每秒1MB。如果缓冲的记录少于一秒,则队列可能无法满负荷运行。默认RecordMaxBufferedTime
值为100ms,每个分片的队列大小为100kB就足够了。所述queueLimit
然后可以通过计算
queue limit = (number of shards * queue size per shard) / record size
例如,对于每条记录200Bytes和8个分片,队列限制为4000是一个很好的起点。如果队列大小限制吞吐量(每个分片每秒低于1MB),请尝试稍微增加队列限制。
使用非AWS Kinesis端点进行测试
有时希望Flink作为消费者或生产者对抗非 AWS Kinesis端点(如Kinesalite) ; 这在执行Flink应用程序的函数测试时特别有用。通常由Flink配置中设置的AWS区域推断的AWS端点必须通过配置属性覆盖。
要覆盖AWS端点,以生产者为例,AWSConfigConstants.AWS_ENDPOINT
除了AWSConfigConstants.AWS_REGION
Flink要求外,还要在Flink配置中设置该属性。虽然该区域是必需的,但它不会用于确定AWS端点URL。
以下示例显示了如何提供AWSConfigConstants.AWS_ENDPOINT
配置属性:
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")