实验功能

This section describes experimental features in the DataStream API. Experimental features are still evolving and can be either unstable, incomplete, or subject to heavy change in future versions.

Reinterpreting a pre-partitioned data stream as keyed stream

We can re-interpret a pre-partitioned data stream as a keyed stream to avoid shuffling.

WARNING: The re-interpreted data stream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment.

One use-case for this could be a materialized shuffle between two jobs: the first job performs a keyBy shuffle and materializes each output into a partition. A second job has sources that, for each parallel instance, reads from the corresponding partitions created by the first job. Those sources can now be re-interpreted as keyed streams, e.g. to apply windowing. Notice that this trick makes the second job embarrassingly parallel, which can be helpful for a fine-grained recovery scheme.

This re-interpretation functionality is exposed through DataStreamUtils:

  1. static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
  2. DataStream<T> stream,
  3. KeySelector<T, K> keySelector,
  4. TypeInformation<K> typeInfo)

Given a base stream, a key selector, and type information, the method creates a keyed stream from the base stream.

Code example:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<Integer> source = ...
  3. DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
  4. .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  5. .reduce((a, b) -> a + b)
  6. .addSink(new DiscardingSink<>());
  7. env.execute();
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(1)
  3. val source = ...
  4. new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
  5. .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  6. .reduce((a, b) => a + b)
  7. .addSink(new DiscardingSink[Int])
  8. env.execute()