How to Migrate from DataSet to DataStream

The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their data processing requirements.

Noticed that APIs in DataStream do not always match those in DataSet exactly. The purpose of this document is to help users understand how to achieve the same data processing behaviors with DataStream APIs as using DataSet APIs.

According to the changes in coding and execution efficiency that are required for migration, we categorized DataSet APIs into 4 categories:

  • Category 1: APIs that have exact equivalent in DataStream, which requires barely any changes to migrate.

  • Category 2: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, which might require some code changes for migration but will result in the same execution efficiency.

  • Category 3: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency.

  • Category 4: APIs whose behaviors are not supported by DataStream API.

The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each category.

Setting the execution environment

The first step of migrating an application from DataSet API to DataStream API is to replace ExecutionEnvironment with StreamExecutionEnvironment.

DataSetDataStream
  1. // Create the execution environment
  2. ExecutionEnvironment.getExecutionEnvironment();
  3. // Create the local execution environment
  4. ExecutionEnvironment.createLocalEnvironment();
  5. // Create the collection environment
  6. new CollectionEnvironment();
  7. // Create the remote environment
  8. ExecutionEnvironment.createRemoteEnvironment(String host, int port, String jarFiles);
  1. // Create the execution environment
  2. StreamExecutionEnvironment.getExecutionEnvironment();
  3. // Create the local execution environment
  4. StreamExecutionEnvironment.createLocalEnvironment();
  5. // The collection environment is not supported.
  6. // Create the remote environment
  7. StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String jarFiles);

Unlike DataSet, DataStream supports processing on both bounded and unbounded data streams. Thus, user needs to explicitly set the execution mode to RuntimeExecutionMode.BATCH if that is expected.

  1. StreamExecutionEnvironment executionEnvironment = // [...];
  2. executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);

Using the streaming sources and sinks

Sources

The DataStream API uses DataStreamSource to read records from external system, while the DataSet API uses the DataSource.

DataSetDataStream
  1. // Read data from file
  2. DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath);
  3. // Read data from collection
  4. DataSource<> source = ExecutionEnvironment.fromCollection(data);
  5. // Read data from inputformat
  6. DataSource<> source = ExecutionEnvironment.createInput(inputFormat)
  1. // Read data from file
  2. DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath);
  3. // Read data from collection
  4. DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data);
  5. // Read data from inputformat
  6. DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat)

Sinks

The DataStream API uses DataStreamSink to write records to external system, while the DataSet API uses the DataSink.

DataSetDataStream
  1. // Write to outputformat
  2. DataSink<> sink = dataSet.output(outputFormat);
  3. // Write to csv file
  4. DataSink<> sink = dataSet.writeAsCsv(filePath);
  5. // Write to text file
  6. DataSink<> sink = dataSet.writeAsText(filePath);
  1. // Write to sink
  2. DataStreamSink<> sink = dataStream.sinkTo(sink)
  3. // Write to csv file
  4. DataStreamSink<> sink = dataStream.writeAsCsv(path);
  5. // Write to text file
  6. DataStreamSink<> sink = dataStream.writeAsText(path);

If you are looking for pre-defined source and sink connectors of DataStream, please check the Connector Docs

Migrating DataSet APIs

Category 1

For Category 1, these DataSet APIs have exact equivalent in DataStream, which requires barely any changes to migrate.

OperationsDataSetDataStream
Map
  1. dataSet.map(new MapFunction<>(){
  2. // implement user-defined map logic
  3. });
  1. dataStream.map(new MapFunction<>(){
  2. // implement user-defined map logic
  3. });
FlatMap
  1. dataSet.flatMap(new FlatMapFunction<>(){
  2. // implement user-defined flatmap logic
  3. });
  1. dataStream.flatMap(new FlatMapFunction<>(){
  2. // implement user-defined flatmap logic
  3. });
Filter
  1. dataSet.filter(new FilterFunction<>(){
  2. // implement user-defined filter logic
  3. });
  1. dataStream.filter(new FilterFunction<>(){
  2. // implement user-defined filter logic
  3. });
Union
  1. dataSet1.union(dataSet2);
  1. dataStream1.union(dataStream2);
Rebalance
  1. dataSet.rebalance();
  1. dataStream.rebalance();
Project
  1. DataSet<Tuple3<>> dataSet = // […]
  2. dataSet.project(2,0);
  1. DataStream<Tuple3<>> dataStream = // […]
  2. dataStream.project(2,0);
Reduce on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.groupBy(value -> value.f0)
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
  1. DataStream<Tuple2<>> dataStream = // […]
  2. dataStream.keyBy(value -> value.f0)
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
Aggregate on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. // compute sum of the second field
  3. dataSet.groupBy(value -> value.f0)
  4. .aggregate(SUM, 1);
  5. // compute min of the second field
  6. dataSet.groupBy(value -> value.f0)
  7. .aggregate(MIN, 1);
  8. // compute max of the second field
  9. dataSet.groupBy(value -> value.f0)
  10. .aggregate(MAX, 1);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // compute sum of the second field
  3. dataStream.keyBy(value -> value.f0)
  4. .sum(1);
  5. // compute min of the second field
  6. dataStream.keyBy(value -> value.f0)
  7. .min(1);
  8. // compute max of the second field
  9. dataStream.keyBy(value -> value.f0)
  10. .max(1);

Category 2

For category 2, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, which might require some code changes for migration but will result in the same execution efficiency.

Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom window that is triggered at the end of the inputs. The EndOfStreamWindows in the Appendix shows how such a window can be implemented. We will reuse it in the rest of this document.

OperationsDataSetDataStream
Distinct
  1. DataSet<Integer> dataSet = // […]
  2. dataSet.distinct();
  1. DataStream<Integer> dataStream = // […]
  2. dataStream.keyBy(value -> value)
  3. .reduce((value1, value2) -> value1);
Hash-Partition
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.partitionByHash(value -> value.f0);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // partition by the hashcode of key
  3. dataStream.partitionCustom(
  4. (key, numSubpartition) -> key.hashCode() % numSubpartition,
  5. value -> value.f0);
Reduce on Full DataSet
  1. DataSet<String> dataSet = // […]
  2. dataSet.reduce(new ReduceFunction<>(){
  3. // implement user-defined reduce logic
  4. });
  1. DataStream<String> dataStream = // […]
  2. dataStream.windowAll(EndOfStreamWindows.get())
  3. .reduce(new ReduceFunction<>(){
  4. // implement user-defined reduce logic
  5. });
Aggregate on Full DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. // compute sum of the second field
  3. dataSet.aggregate(SUM, 1);
  4. // compute min of the second field
  5. dataSet.aggregate(MIN, 1);
  6. // compute max of the second field
  7. dataSet.aggregate(MAX, 1);
  1. DataStream<Tuple2<>> dataStream = // […]
  2. // compute sum of the second field
  3. dataStream.windowAll(EndOfStreamWindows.get())
  4. .sum(1);
  5. // compute min of the second field
  6. dataStream.windowAll(EndOfStreamWindows.get())
  7. .min(1);
  8. // compute max of the second field
  9. dataStream.windowAll(EndOfStreamWindows.get())
  10. .max(1);
GroupReduce on Full DataSet
  1. DataSet<Integer> dataSet = // […]
  2. dataSet.reduceGroup(new GroupReduceFunction<>(){
  3. // implement user-defined group reduce logic
  4. });
  1. DataStream<Integer> dataStream = // […]
  2. dataStream.windowAll(EndOfStreamWindows.get())
  3. .apply(new WindowFunction<>(){
  4. // implement user-defined group reduce logic
  5. });
GroupReduce on Grouped DataSet
  1. DataSet<Tuple2<>> dataSet = // […]
  2. dataSet.groupBy(value -> value.f0)
  3. .reduceGroup(new GroupReduceFunction<>(){
  4. // implement user-defined group reduce logic
  5. });
  1. DataStream<Tuple2<>> dataStream = // […]
  2. dataStream.keyBy(value -> value.f0)
  3. .window(EndOfStreamWindows.get())
  4. .apply(new WindowFunction<>(){
  5. // implement user-defined group reduce logic
  6. });
First-n
  1. dataSet.first(n)
  1. dataStream.windowAll(EndOfStreamWindows.get())
  2. .apply(new AllWindowFunction<>(){
  3. // implement first-n logic
  4. });
Join
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. dataSet1.join(dataSet2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .with(new JoinFunction<>(){
  7. // implement user-defined join logic
  8. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. dataStream1.join(dataStream2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .window(EndOfStreamWindows.get()))
  7. .apply(new JoinFunction<>(){
  8. // implement user-defined join logic
  9. });
CoGroup
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. dataSet1.coGroup(dataSet2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .with(new CoGroupFunction<>(){
  7. // implement user-defined co group logic
  8. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. dataStream1.coGroup(dataStream2)
  4. .where(value -> value.f0)
  5. .equalTo(value -> value.f0)
  6. .window(EndOfStreamWindows.get()))
  7. .apply(new CoGroupFunction<>(){
  8. // implement user-defined co group logic
  9. });
OuterJoin
  1. DataSet<Tuple2<>> dataSet1 = // […]
  2. DataSet<Tuple2<>> dataSet2 = // […]
  3. // left outer join
  4. dataSet1.leftOuterJoin(dataSet2)
  5. .where(dataSet1.f0)
  6. .equalTo(dataSet2.f0)
  7. .with(new JoinFunction<>(){
  8. // implement user-defined left outer join logic
  9. });
  10. // right outer join
  11. dataSet1.rightOuterJoin(dataSet2)
  12. .where(dataSet1.f0)
  13. .equalTo(dataSet2.f0)
  14. .with(new JoinFunction<>(){
  15. // implement user-defined right outer join logic
  16. });
  1. DataStream<Tuple2<>> dataStream1 = // […]
  2. DataStream<Tuple2<>> dataStream2 = // […]
  3. // left outer join
  4. dataStream1.coGroup(dataStream2)
  5. .where(value -> value.f0)
  6. .equalTo(value -> value.f0)
  7. .window(EndOfStreamWindows.get())
  8. .apply((leftIterable, rightInterable, collector) -> {
  9. if(!rightInterable.iterator().hasNext()){
  10. // implement user-defined left outer join logic
  11. }
  12. });
  13. // right outer join
  14. dataStream1.coGroup(dataStream2)
  15. .where(value -> value.f0)
  16. .equalTo(value -> value.f0)
  17. .window(EndOfStreamWindows.get())
  18. .apply((leftIterable, rightInterable, collector) -> {
  19. if(!leftIterable.iterator().hasNext()){
  20. // implement user-defined right outer join logic
  21. }
  22. });

Category 3

For category 3, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency.

Currently, DataStream API does not directly support aggregations on non-keyed streams (subtask-scope aggregations). In order to do so, we need to first assign the subtask id to the records, then turn the stream into a keyed stream. The AddSubtaskIdMapFunction in the Appendix shows how to do that, and we will reuse it in the rest of this document.

OperationsDataSetDataStream
MapPartition/SortPartition
  1. DataSet<Integer> dataSet = // […]
  2. // MapPartition
  3. dataSet.mapPartition(new MapPartitionFunction<>(){
  4. // implement user-defined map partition logic
  5. });
  6. // SortPartition
  7. dataSet.sortPartition(0, Order.ASCENDING);
  8. dataSet.sortPartition(0, Order.DESCENDING);
  1. DataStream<Integer> dataStream = // […]
  2. // assign subtask ID to all records
  3. DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction());
  4. dataStream1.keyBy(value -> value.f0)
  5. .window(EndOfStreamWindows.get())
  6. .apply(new WindowFunction<>(){
  7. // implement user-defined map partition or sort partition logic
  8. });
Cross
  1. DataSet<Integer> dataSet1 = // […]
  2. DataSet<Integer> dataSet2 = // […]
  3. // Cross
  4. dataSet1.cross(dataSet2)
  5. .with(new CrossFunction<>(){
  6. // implement user-defined cross logic
  7. })
  1. // the parallelism of dataStream1 and dataStream2 should be same
  2. DataStream<Integer> dataStream1 = // […]
  3. DataStream<Integer> dataStream2 = // […]
  4. DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction());
  5. DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction());
  6. // join the two streams according to the subtask ID
  7. dataStream3.join(dataStream4)
  8. .where(value -> value.f0)
  9. .equalTo(value -> value.f0)
  10. .window(EndOfStreamWindows.get())
  11. .apply(new JoinFunction<>(){
  12. // implement user-defined cross logic
  13. })

Category 4

The behaviors of the following DataSet APIs are not supported by DataStream.

  • RangePartition
  • GroupCombine

Appendix

EndOfStreamWindows

The following code shows the example of EndOfStreamWindows.

  1. public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
  2. private static final long serialVersionUID = 1L;
  3. private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows();
  4. private static final TimeWindow TIME_WINDOW_INSTANCE =
  5. new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
  6. private EndOfStreamWindows() {}
  7. public static EndOfStreamWindows get() {
  8. return INSTANCE;
  9. }
  10. @Override
  11. public Collection<TimeWindow> assignWindows(
  12. Object element, long timestamp, WindowAssignerContext context) {
  13. return Collections.singletonList(TIME_WINDOW_INSTANCE);
  14. }
  15. @Override
  16. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  17. return new EndOfStreamTrigger();
  18. }
  19. @Override
  20. public String toString() {
  21. return "EndOfStreamWindows()";
  22. }
  23. @Override
  24. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  25. return new TimeWindow.Serializer();
  26. }
  27. @Override
  28. public boolean isEventTime() {
  29. return true;
  30. }
  31. @Internal
  32. public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
  33. @Override
  34. public TriggerResult onElement(
  35. Object element, long timestamp, TimeWindow window, TriggerContext ctx)
  36. throws Exception {
  37. return TriggerResult.CONTINUE;
  38. }
  39. @Override
  40. public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
  41. return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
  42. }
  43. @Override
  44. public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
  45. @Override
  46. public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
  47. return TriggerResult.CONTINUE;
  48. }
  49. }
  50. }

AddSubtaskIDMapFunction

The following code shows the example of AddSubtaskIDMapFunction.

  1. public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
  2. @Override
  3. public Tuple2<String, T> map(T value) {
  4. return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
  5. }
  6. }