数据管道 & ETL

Apache Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用 Flink 的 DataStream API 实现这类应用。

这里注意,Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论你最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

本节涵盖了 map()flatmap(),这两种算子可以用来实现无状态转换的基本操作。本节中的示例建立在你已经熟悉 flink-training repo 中的出租车行程数据的基础上。

map()

在第一个练习中,你将过滤出租车行程数据中的事件。在同一代码仓库中,有一个 GeoUtils 类,提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),它可以将位置坐标(经度,维度)映射到 100x100 米的对应不同区域的网格单元。

现在让我们为每个出租车行程时间的数据对象增加 startCellendCell 字段。你可以创建一个继承 TaxiRideEnrichedRide 类,添加这些字段:

  1. public static class EnrichedRide extends TaxiRide {
  2. public int startCell;
  3. public int endCell;
  4. public EnrichedRide() {}
  5. public EnrichedRide(TaxiRide ride) {
  6. this.rideId = ride.rideId;
  7. this.isStart = ride.isStart;
  8. ...
  9. this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
  10. this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
  11. }
  12. public String toString() {
  13. return super.toString() + "," +
  14. Integer.toString(this.startCell) + "," +
  15. Integer.toString(this.endCell);
  16. }
  17. }

然后你可以创建一个应用来转换这个流

  1. DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
  2. DataStream<EnrichedRide> enrichedNYCRides = rides
  3. .filter(new RideCleansingSolution.NYCFilter())
  4. .map(new Enrichment());
  5. enrichedNYCRides.print();

使用这个 MapFunction:

  1. public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
  2. @Override
  3. public EnrichedRide map(TaxiRide taxiRide) throws Exception {
  4. return new EnrichedRide(taxiRide);
  5. }
  6. }

flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。对于除此以外的场景,你将要使用 flatmap()

  1. DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
  2. DataStream<EnrichedRide> enrichedNYCRides = rides
  3. .flatMap(new NYCEnrichment());
  4. enrichedNYCRides.print();

其中用到的 FlatMapFunction :

  1. public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
  2. @Override
  3. public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
  4. FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
  5. if (valid.filter(taxiRide)) {
  6. out.collect(new EnrichedRide(taxiRide));
  7. }
  8. }
  9. }

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector) 实现。

  1. rides
  2. .flatMap(new NYCEnrichment())
  3. .keyBy("startCell")

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

在上面的例子中,将 “startCell” 这个字段定义为键。这种选择键的方式有个缺点,就是编译器无法推断用作键的字段的类型,所以 Flink 会将键值作为元组传递,这有时候会比较难处理。所以最好还是使用一个合适的 KeySelector,

  1. rides
  2. .flatMap(new NYCEnrichment())
  3. .keyBy(
  4. new KeySelector<EnrichedRide, int>() {
  5. @Override
  6. public int getKey(EnrichedRide enrichedRide) throws Exception {
  7. return enrichedRide.startCell;
  8. }
  9. })

也可以使用更简洁的 lambda 表达式:

  1. rides
  2. .flatMap(new NYCEnrichment())
  3. .keyBy(enrichedRide -> enrichedRide.startCell)

通过计算得到键

KeySelector 不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了 hashCode()equals()。这些限制条件不包括产生随机数或者返回 Arrays 或 Enums 的 KeySelector,但你可以用元组和 POJO 来组成键,只要他们的元素遵循上述条件。

键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。

例如,比起创建一个新的带有 startCell 字段的 EnrichedRide 类,用这个字段作为 key:

  1. keyBy(enrichedRide -> enrichedRide.startCell)

我们更倾向于这样做:

  1. keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream 的聚合

以下代码为每个行程结束事件创建了一个新的包含 startCell 和时长(分钟)的元组流:

  1. import org.joda.time.Interval;
  2. DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
  3. .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
  4. @Override
  5. public void flatMap(EnrichedRide ride,
  6. Collector<Tuple2<Integer, Minutes>> out) throws Exception {
  7. if (!ride.isStart) {
  8. Interval rideInterval = new Interval(ride.startTime, ride.endTime);
  9. Minutes duration = rideInterval.toDuration().toStandardMinutes();
  10. out.collect(new Tuple2<>(ride.startCell, duration));
  11. }
  12. }
  13. });

现在就可以产生一个流,对每个 startCell 仅包含那些最长行程的数据。

有很多种方法表示使用哪个字段作为键。前面使用 EnrichedRide POJO 的例子,用字段名来指定键。而这个使用 Tuple2 对象的例子中,用字段在元组中的序号(从0开始)来指定键。

  1. minutesByStartCell
  2. .keyBy(0) // startCell
  3. .maxBy(1) // duration
  4. .print();

现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应 50797 网格单元的数据:

  1. ...
  2. 4> (64549,5M)
  3. 4> (46298,18M)
  4. 1> (51549,14M)
  5. 1> (53043,13M)
  6. 1> (56031,22M)
  7. 1> (50797,6M)
  8. ...
  9. 1> (50797,8M)
  10. ...
  11. 1> (50797,11M)
  12. ...
  13. 1> (50797,12M)

(隐式的)状态

这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink 必须跟踪每个不同的键的最大时长。

只要应用中有状态,你就应该考虑状态的大小。如果键值的数量是无限的,那 Flink 的状态需要的空间也同样是无限的。

在流处理场景中,考虑有限窗口的聚合往往比整个流聚合更有意义。

reduce() 和其他聚合算子

上面用到的 maxBy() 只是 Flink 中 KeyedStream 上众多聚合函数中的一个。还有一个更通用的 reduce() 函数可以用来实现你的自定义聚合。

有状态的转换

Flink 为什么要参与状态管理?

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunctionMapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

一个使用 Keyed State 的例子

在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 DeduplicatorRichFlatMapFunction

  1. private static class Event {
  2. public final String key;
  3. public final long timestamp;
  4. ...
  5. }
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.addSource(new EventSource())
  9. .keyBy(e -> e.key)
  10. .flatMap(new Deduplicator())
  11. .print();
  12. env.execute();
  13. }

为了实现这个功能,Deduplicator 需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。

当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。

Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean 类型的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open() 方法通过定义 ValueStateDescriptor<Boolean> 建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN)。

  1. public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
  2. ValueState<Boolean> keyHasBeenSeen;
  3. @Override
  4. public void open(Configuration conf) {
  5. ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
  6. keyHasBeenSeen = getRuntimeContext().getState(desc);
  7. }
  8. @Override
  9. public void flatMap(Event event, Collector<Event> out) throws Exception {
  10. if (keyHasBeenSeen.value() == null) {
  11. out.collect(event);
  12. keyHasBeenSeen.update(true);
  13. }
  14. }
  15. }

当 flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null 时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeentrue

这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator 的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunctionopen 方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

  1. ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean 类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear() 来实现,如下:

  1. keyHasBeenSeen.clear()

对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction 的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Non-keyed State

在没有键的上下文中我们也可以使用 Flink 管理的状态。这也被称作 算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来说使用 non-keyed state 是不太常见的,所以这里就不多介绍了。这个特性最常用于 source 和 sink 的实现。

Connected Streams

相比于下面这种预先定义的转换:

simple transformation

有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

示例

在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunctionRichCoFlatMapFunction 作用于连接的流来实现这个功能。

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
  4. DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
  5. control
  6. .connect(datastreamOfWords)
  7. .flatMap(new ControlFunction())
  8. .print();
  9. env.execute();
  10. }

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

  1. public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
  2. private ValueState<Boolean> blocked;
  3. @Override
  4. public void open(Configuration config) {
  5. blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
  6. }
  7. @Override
  8. public void flatMap1(String control_value, Collector<String> out) throws Exception {
  9. blocked.update(Boolean.TRUE);
  10. }
  11. @Override
  12. public void flatMap2(String data_value, Collector<String> out) throws Exception {
  13. if (blocked.value() == null) {
  14. out.collect(data_value);
  15. }
  16. }
  17. }

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(datastreamOfWords)

认识到你没法控制 flatMap1flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

动手练习

本节的动手练习是 行程和票价练习

延展阅读