Experimental Features

This section describes experimental features in the DataStream API. Experimental features are still evolving and can be either unstable, incomplete, or subject to heavy change in future versions.

Reinterpreting a pre-partitioned data stream as keyed stream

We can re-interpret a pre-partitioned data stream as a keyed stream to avoid shuffling.

WARNING: The re-interpreted data stream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment.

One use-case for this could be a materialized shuffle between two jobs: the first job performs a keyBy shuffle and materializes each output into a partition. A second job has sources that, for each parallel instance, reads from the corresponding partitions created by the first job. Those sources can now be re-interpreted as keyed streams, e.g. to apply windowing. Notice that this trick makes the second job embarrassingly parallel, which can be helpful for a fine-grained recovery scheme.

This re-interpretation functionality is exposed through DataStreamUtils:

  1. static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
  2. DataStream<T> stream,
  3. KeySelector<T, K> keySelector,
  4. TypeInformation<K> typeInfo)

Given a base stream, a key selector, and type information, the method creates a keyed stream from the base stream.

Code example:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<Integer> source = ...
  3. DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
  4. .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  5. .reduce((a, b) -> a + b)
  6. .addSink(new DiscardingSink<>());
  7. env.execute();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(1)
  3. val source = ...
  4. new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
  5. .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  6. .reduce((a, b) => a + b)
  7. .addSink(new DiscardingSink[Int])
  8. env.execute()