Full Window Partition Processing on DataStream

This page explains the use of full window partition processing API on DataStream. Flink enables both keyed and non-keyed DataStream to directly transform into PartitionWindowedStream now. The PartitionWindowedStream represents collecting all records of each subtask separately into a full window. The PartitionWindowedStream support four APIs: mapPartition, sortPartition, aggregate and reduce.

Note: Details about the design and implementation of the full window partition processing can be found in the proposal and design document FLIP-380: Support Full Partition Processing On Non-keyed DataStream.

MapPartition

MapPartition represents collecting all records of each subtask separately into a full window and process them using the given MapPartitionFunction within each subtask. The MapPartitionFunction is called at the end of inputs.

An example of calculating the sum of the elements in each subtask is as follows:

  1. DataStream<Integer> dataStream = //...
  2. PartitionWindowedStream<Integer> partitionWindowedDataStream = dataStream.fullWindowPartition();
  3. DataStream<Integer> resultStream = partitionWindowedDataStream.mapPartition(
  4. new MapPartitionFunction<Integer, Integer>() {
  5. @Override
  6. public void mapPartition(
  7. Iterable<Integer> values, Collector<Integer> out) {
  8. int result = 0;
  9. for (Integer value : values) {
  10. result += value;
  11. }
  12. out.collect(result);
  13. }
  14. }
  15. );

SortPartition

SortPartition represents collecting all records of each subtask separately into a full window and sorts them by the given record comparator in each subtask at the end of inputs.

An example of sorting the records by the first element of tuple in each subtask is as follows:

  1. DataStream<Tuple2<Integer, Integer>> dataStream = //...
  2. PartitionWindowedStream<Tuple2<Integer, Integer>> partitionWindowedDataStream = dataStream.fullWindowPartition();
  3. DataStream<Integer> resultStream = partitionWindowedDataStream.sortPartition(0, Order.ASCENDING);

Aggregate

Aggregate represents collecting all records of each subtask separately into a full window and applies the given AggregateFunction to the records of the window. The AggregateFunction is called for each element, aggregating values incrementally within the window.

An example of aggregate the records in each subtask is as follows:

  1. DataStream<Tuple2<Integer, Integer>> dataStream = //...
  2. PartitionWindowedStream<Tuple2<Integer, Integer>> partitionWindowedDataStream = dataStream.fullWindowPartition();
  3. DataStream<Integer> resultStream = partitionWindowedDataStream.aggregate(new AggregateFunction<>{...});

Reduce

Reduce represents applies a reduce transformation on all the records in the partition. The ReduceFunction will be called for every record in the window. An example is as follows:

  1. DataStream<Tuple2<Integer, Integer>> dataStream = //...
  2. PartitionWindowedStream<Tuple2<Integer, Integer>> partitionWindowedDataStream = dataStream.fullWindowPartition();
  3. DataStream<Integer> resultStream = partitionWindowedDataStream.aggregate(new ReduceFunction<>{...});