如何迁移 DataSet 到 DataStream

DataSet API 已被正式弃用,并且将不再获得主动的维护和支持,它将在 Flink 2.0 版本被删除。 建议 Flink 用户从 DataSet API 迁移到 DataStream API、Table API 和 SQL 来满足数据处理需求。

请注意,DataStream 中的 API 并不总是与 DataSet 完全匹配。 本文档的目的是帮助用户理解如何使用 DataStream API 实现与使用 DataSet API 相同的数据处理行为。

根据迁移过程中开发和执行效率的变化程度,我们将 DataSet API 分为四类:

  • 第一类:在 DataStream 中具有完全相同的 API,几乎不需要任何更改即可迁移;

  • 第二类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码,但仍保持相同的执行效率;

  • 第三类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本;

  • 第四类:其行为不被 DataStream API 支持。

后续章节将首先介绍如何设置执行环境和 source/sink ,然后详细解释每种类别的 DataSet API 如何迁移到 DataStream API,强调与每个类别迁移过程中相关的考虑因素和面临的挑战。

设置执行环境

将应用程序从 DataSet API 迁移到 DataStream API 的第一步是将 ExecutionEnvironment 替换为 StreamExecutionEnvironment

DataSetDataStream
  1. // 创建执行环境
  2. ExecutionEnvironment.getExecutionEnvironment();
  3. // 创建本地执行环境
  4. ExecutionEnvironment.createLocalEnvironment();
  5. // 创建 collection 环境
  6. new CollectionEnvironment();
  7. // 创建远程执行环境
  8. ExecutionEnvironment.createRemoteEnvironment(String host, int port, String jarFiles);
  1. // 创建执行环境
  2. StreamExecutionEnvironment.getExecutionEnvironment();
  3. // 创建本地执行环境
  4. StreamExecutionEnvironment.createLocalEnvironment();
  5. // 不支持 collection 环境
  6. // 创建远程执行环境
  7. StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String jarFiles);

与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。

如果需要的话,用户可以显式地将执行模式设置为 RuntimeExecutionMode.BATCH

  1. StreamExecutionEnvironment executionEnvironment = // [...];
  2. executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);

设置 streaming 类型的 Source 和 Sink

Sources

DataStream API 使用 DataStreamSource 从外部系统读取记录,而 DataSet API 使用 DataSource

DataSetDataStream
  1. // Read data from file
  2. DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath);
  3. // Read data from collection
  4. DataSource<> source = ExecutionEnvironment.fromCollection(data);
  5. // Read data from inputformat
  6. DataSource<> source = ExecutionEnvironment.createInput(inputFormat)
  1. // Read data from file
  2. DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath);
  3. // Read data from collection
  4. DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data);
  5. // Read data from inputformat
  6. DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat)

Sinks

DataStream API 使用 DataStreamSink 将记录写入外部系统,而 DataSet API 使用 DataSink

DataSetDataStream
  1. // Write to outputformat
  2. DataSink<> sink = dataSet.output(outputFormat);
  3. // Write to csv file
  4. DataSink<> sink = dataSet.writeAsCsv(filePath);
  5. // Write to text file
  6. DataSink<> sink = dataSet.writeAsText(filePath);
  1. // Write to sink
  2. DataStreamSink<> sink = dataStream.sinkTo(sink)
  3. // Write to csv file
  4. DataStreamSink<> sink = dataStream.writeAsCsv(path);
  5. // Write to text file
  6. DataStreamSink<> sink = dataStream.writeAsText(path);

如果您正在寻找 DataStream 预定义的连接器,请查看连接器

迁移 DataSet APIs

第一类

对于第一类,这些 DataSet API 在 DataStream 中具有完全相同的功能,几乎不需要任何更改即可迁移。

OperationsDataSetDataStream
Map
  1. dataSet.map(new MapFunction<>(){
  2. // implement user-defined map logic
  3. });
  1. dataStream.map(new MapFunction<>(){
  2. // implement user-defined map logic
  3. });
FlatMap
  1. dataSet.flatMap(new FlatMapFunction<>(){
  2. // implement user-defined flatmap logic
  3. });
  1. dataStream.flatMap(new FlatMapFunction<>(){
  2. // implement user-defined flatmap logic
  3. });
Filter
  1. dataSet.filter(new FilterFunction<>(){
  2. // implement user-defined filter logic
  3. });
  1. dataStream.filter(new FilterFunction<>(){
  2. // implement user-defined filter logic
  3. });
Union
  1. dataSet1.union(dataSet2);
  1. dataStream1.union(dataStream2);
Rebalance
  1. dataSet.rebalance();
  1. dataStream.rebalance();
Project
  1. DataSet<Tuple3<>> dataSet = // […]
  2. dataSet.project(2,0);
  1. DataStream<Tuple3<>> dataStream = // […]
  2. dataStream.project(2,0);
Reduce on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.groupBy(value -> value.f0)
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
  1. DataStream<Tuple2<>> dataStream = // […]
  2. dataStream.keyBy(value -> value.f0)
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
Aggregate on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. // compute sum of the second field
  3. dataSet.groupBy(value -> value.f0)
  4. .aggregate(SUM, 1);
  5. // compute min of the second field
  6. dataSet.groupBy(value -> value.f0)
  7. .aggregate(MIN, 1);
  8. // compute max of the second field
  9. dataSet.groupBy(value -> value.f0)
  10. .aggregate(MAX, 1);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // compute sum of the second field
  3. dataStream.keyBy(value -> value.f0)
  4. .sum(1);
  5. // compute min of the second field
  6. dataStream.keyBy(value -> value.f0)
  7. .min(1);
  8. // compute max of the second field
  9. dataStream.keyBy(value -> value.f0)
  10. .max(1);

第二类

对于第二类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码来进行迁移,但仍保持相同的执行效率。

DataSet 中存在对整个 DataSet 进行操作的 API。这些 API 在 DataStream 中可以用一个全局窗口来实现,该全局窗口只会在输入数据结束时触发窗口内数据的计算。 附录中的 EndOfStreamWindows 显示了如何实现这样的窗口,我们将在本文档的其余部分重复使用它。

OperationsDataSetDataStream
Distinct
  1. DataSet<Integer> dataSet = // […]
  2. dataSet.distinct();
  1. DataStream<Integer> dataStream = // […]
  2. dataStream.keyBy(value -> value)
  3. .reduce((value1, value2) -> value1);
Hash-Partition
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.partitionByHash(value -> value.f0);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // partition by the hashcode of key
  3. dataStream.partitionCustom(
  4. (key, numSubpartition) -> key.hashCode() % numSubpartition,
  5. value -> value.f0);
Reduce on Full DataSet
  1. DataSet<String> dataSet = // […]
  2. dataSet.reduce(new ReduceFunction<>(){
  3. // implement user-defined reduce logic
  4. });
  1. DataStream<String> dataStream = // […]
  2. dataStream.windowAll(EndOfStreamWindows.get())
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
Aggregate on Full DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. // compute sum of the second field
  3. dataSet.aggregate(SUM, 1);
  4. // compute min of the second field
  5. dataSet.aggregate(MIN, 1);
  6. // compute max of the second field
  7. dataSet.aggregate(MAX, 1);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // compute sum of the second field
  3. dataStream.windowAll(EndOfStreamWindows.get())
  4. .sum(1);
  5. // compute min of the second field
  6. dataStream.windowAll(EndOfStreamWindows.get())
  7. .min(1);
  8. // compute max of the second field
  9. dataStream.windowAll(EndOfStreamWindows.get())
  10. .max(1);
GroupReduce on Full DataSet
  1. DataSet<Integer> dataSet = // […]
  2. dataSet.reduceGroup(new GroupReduceFunction<>(){
  3. // implement user-defined group reduce logic
  4. });
  1. DataStream<Integer> dataStream = // […]
  2. dataStream.windowAll(EndOfStreamWindows.get())
  3. .apply(new WindowFunction<>(){
  4. // implement user-defined group reduce logic
  5. });
GroupReduce on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.groupBy(value -> value.f0)
  3. .reduceGroup(new GroupReduceFunction<>(){
  4. // implement user-defined group reduce logic
  5. });
  1. DataStream<Tuple2<>> dataStream = // […]
  2. dataStream.keyBy(value -> value.f0)
  3. .window(EndOfStreamWindows.get())
  4. .apply(new WindowFunction<>(){
  5. // implement user-defined group reduce logic
  6. });
First-n
  1. dataSet.first(n)
  1. dataStream.windowAll(EndOfStreamWindows.get())
  2. .apply(new AllWindowFunction<>(){
  3. // implement first-n logic
  4. });
Join
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. dataSet1.join(dataSet2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .with(new JoinFunction<>(){
  7. // implement user-defined join logic
  8. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. dataStream1.join(dataStream2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .window(EndOfStreamWindows.get()))
  7. .apply(new JoinFunction<>(){
  8. // implement user-defined join logic
  9. });
CoGroup
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. dataSet1.coGroup(dataSet2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .with(new CoGroupFunction<>(){
  7. // implement user-defined co group logic
  8. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. dataStream1.coGroup(dataStream2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .window(EndOfStreamWindows.get()))
  7. .apply(new CoGroupFunction<>(){
  8. // implement user-defined co group logic
  9. });
OuterJoin
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. // left outer join
  4. dataSet1.leftOuterJoin(dataSet2)
  5. .where(dataSet1.f0)
  6. .equalTo(dataSet2.f0)
  7. .with(new JoinFunction<>(){
  8. // implement user-defined left outer join logic
  9. });
  10. // right outer join
  11. dataSet1.rightOuterJoin(dataSet2)
  12. .where(dataSet1.f0)
  13. .equalTo(dataSet2.f0)
  14. .with(new JoinFunction<>(){
  15. // implement user-defined right outer join logic
  16. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. // left outer join
  4. dataStream1.coGroup(dataStream2)
  5. .where(value -> value.f0)
  6. .equalTo(value -> value.f0)
  7. .window(EndOfStreamWindows.get())
  8. .apply((leftIterable, rightInterable, collector) -> {
  9. if(!rightInterable.iterator().hasNext()){
  10. // implement user-defined left outer join logic
  11. }
  12. });
  13. // right outer join
  14. dataStream1.coGroup(dataStream2)
  15. .where(value -> value.f0)
  16. .equalTo(value -> value.f0)
  17. .window(EndOfStreamWindows.get())
  18. .apply((leftIterable, rightInterable, collector) -> {
  19. if(!leftIterable.iterator().hasNext()){
  20. // implement user-defined right outer join logic
  21. }
  22. });

第三类

对于第三类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本。

目前,DataStream API 不直接支持 non-keyed 流上的聚合(对 subtask 内的数据进行聚合)。为此,我们需要首先将 subtask ID 分配给记录,然后将流转换为 keyed 流。 附录中的 AddSubtaskIdMapFunction 显示了如何执行此操作,我们将在本文档的其余部分中重复使用它。

OperationsDataSetDataStream
MapPartition/SortPartition
  1. DataSet<Integer> dataSet = // […]
  2. // MapPartition
  3. dataSet.mapPartition(new MapPartitionFunction<>(){
  4. // implement user-defined map partition logic
  5. });
  6. // SortPartition
  7. dataSet.sortPartition(0, Order.ASCENDING);
  8. dataSet.sortPartition(0, Order.DESCENDING);
  1. DataStream<Integer> dataStream = // […]
  2. // assign subtask ID to all records
  3. DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction());
  4. dataStream1.keyBy(value -> value.f0)
  5. .window(EndOfStreamWindows.get())
  6. .apply(new WindowFunction<>(){
  7. // implement user-defined map partition or sort partition logic
  8. });
Cross
  1. DataSet<Integer> dataSet1 = // […]
  2. DataSet<Integer> dataSet2 = // […]
  3. // Cross
  4. dataSet1.cross(dataSet2)
  5. .with(new CrossFunction<>(){
  6. // implement user-defined cross logic
  7. })
  1. // the parallelism of dataStream1 and dataStream2 should be same
  2. DataStream<Integer> dataStream1 = // […]
  3. DataStream<Integer> dataStream2 = // […]
  4. DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction());
  5. DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction());
  6. // join the two streams according to the subtask ID
  7. dataStream3.join(dataStream4)
  8. .where(value -> value.f0)
  9. .equalTo(value -> value.f0)
  10. .window(EndOfStreamWindows.get())
  11. .apply(new JoinFunction<>(){
  12. // implement user-defined cross logic
  13. })

第四类

以下 DataSet API 的行为不被 DataStream 支持。

  • RangePartition
  • GroupCombine

附录

EndOfStreamWindows

以下代码展示了 EndOfStreamWindows 示例实现。

  1. public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
  2. private static final long serialVersionUID = 1L;
  3. private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows();
  4. private static final TimeWindow TIME_WINDOW_INSTANCE =
  5. new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
  6. private EndOfStreamWindows() {}
  7. public static EndOfStreamWindows get() {
  8. return INSTANCE;
  9. }
  10. @Override
  11. public Collection<TimeWindow> assignWindows(
  12. Object element, long timestamp, WindowAssignerContext context) {
  13. return Collections.singletonList(TIME_WINDOW_INSTANCE);
  14. }
  15. @Override
  16. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  17. return new EndOfStreamTrigger();
  18. }
  19. @Override
  20. public String toString() {
  21. return "EndOfStreamWindows()";
  22. }
  23. @Override
  24. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  25. return new TimeWindow.Serializer();
  26. }
  27. @Override
  28. public boolean isEventTime() {
  29. return true;
  30. }
  31. @Internal
  32. public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
  33. @Override
  34. public TriggerResult onElement(
  35. Object element, long timestamp, TimeWindow window, TriggerContext ctx)
  36. throws Exception {
  37. return TriggerResult.CONTINUE;
  38. }
  39. @Override
  40. public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
  41. return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
  42. }
  43. @Override
  44. public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
  45. @Override
  46. public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
  47. return TriggerResult.CONTINUE;
  48. }
  49. }
  50. }

AddSubtaskIDMapFunction

以下代码展示了 AddSubtaskIDMapFunction 示例实现。

  1. public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
  2. @Override
  3. public Tuple2<String, T> map(T value) {
  4. return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
  5. }
  6. }