Operators

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

This section gives a description of the basic transformations, the effective physical partitioning after applying those as well as insights into Flink’s operator chaining.

DataStream Transformations

Map

DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

Java

  1. DataStream<Integer> dataStream = //...
  2. dataStream.map(new MapFunction<Integer, Integer>() {
  3. @Override
  4. public Integer map(Integer value) throws Exception {
  5. return 2 * value;
  6. }
  7. });

Scala

  1. dataStream.map { x => x * 2 }

Python

  1. data_stream = env.from_collection(collection=[1, 2, 3, 4, 5])
  2. data_stream.map(lambda x: 2 * x, output_type=Types.INT())

FlatMap

DataStream → DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

Java

  1. dataStream.flatMap(new FlatMapFunction<String, String>() {
  2. @Override
  3. public void flatMap(String value, Collector<String> out)
  4. throws Exception {
  5. for(String word: value.split(" ")){
  6. out.collect(word);
  7. }
  8. }
  9. });

Scala

  1. dataStream.flatMap { str => str.split(" ") }

Python

  1. data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
  2. data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

Filter

DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

Java

  1. dataStream.filter(new FilterFunction<Integer>() {
  2. @Override
  3. public boolean filter(Integer value) throws Exception {
  4. return value != 0;
  5. }
  6. });

Scala

  1. dataStream.filter { _ != 0 }

Python

  1. data_stream = env.from_collection(collection=[0, 1, 2, 3, 4, 5])
  2. data_stream.filter(lambda x: x != 0)

KeyBy

DataStream → KeyedStream

Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

Java

  1. dataStream.keyBy(value -> value.getSomeKey());
  2. dataStream.keyBy(value -> value.f0);

Scala

  1. dataStream.keyBy(_.someKey)
  2. dataStream.keyBy(_._1)

Python

  1. data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'b')])
  2. data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) // Key by the result of KeySelector

A type cannot be a key if:

  1. it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
  2. it is an array of any type.

Reduce

KeyedStream → DataStream

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

Java

  1. keyedStream.reduce(new ReduceFunction<Integer>() {
  2. @Override
  3. public Integer reduce(Integer value1, Integer value2)
  4. throws Exception {
  5. return value1 + value2;
  6. }
  7. });

Scala

  1. keyedStream.reduce { _ + _ }

Python

  1. data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
  2. data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))

Window

KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

Java

  1. dataStream
  2. .keyBy(value -> value.f0)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(5)));

Scala

  1. dataStream
  2. .keyBy(_._1)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(5)))

Python

  1. data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Time.seconds(5)))

WindowAll

DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

Java

  1. dataStream
  2. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

Scala

  1. dataStream
  2. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

Python

  1. data_stream.window_all(TumblingEventTimeWindows.of(Time.seconds(5)))

Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

Java

  1. windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
  2. public void apply (Tuple tuple,
  3. Window window,
  4. Iterable<Tuple2<String, Integer>> values,
  5. Collector<Integer> out) throws Exception {
  6. int sum = 0;
  7. for (value t: values) {
  8. sum += t.f1;
  9. }
  10. out.collect (new Integer(sum));
  11. }
  12. });
  13. // applying an AllWindowFunction on non-keyed window stream
  14. allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
  15. public void apply (Window window,
  16. Iterable<Tuple2<String, Integer>> values,
  17. Collector<Integer> out) throws Exception {
  18. int sum = 0;
  19. for (value t: values) {
  20. sum += t.f1;
  21. }
  22. out.collect (new Integer(sum));
  23. }
  24. });

Scala

  1. windowedStream.apply { WindowFunction }
  2. // applying an AllWindowFunction on non-keyed window stream
  3. allWindowedStream.apply { AllWindowFunction }

Python

  1. class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):
  2. def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
  3. sum = 0
  4. for input in inputs:
  5. sum += input[1]
  6. yield sum
  7. class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):
  8. def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
  9. sum = 0
  10. for input in inputs:
  11. sum += input[1]
  12. yield sum
  13. windowed_stream.apply(MyWindowFunction())
  14. # applying an AllWindowFunction on non-keyed window stream
  15. all_windowed_stream.apply(MyAllWindowFunction())

WindowReduce

WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

Java

  1. windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
  2. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
  3. return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
  4. }
  5. });

Scala

  1. windowedStream.reduce { _ + _ }

Python

  1. class MyReduceFunction(ReduceFunction):
  2. def reduce(self, value1, value2):
  3. return value1[0], value1[1] + value2[1]
  4. windowed_stream.reduce(MyReduceFunction())

Union

DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

Java

  1. dataStream.union(otherStream1, otherStream2, ...);

Scala

  1. dataStream.union(otherStream1, otherStream2, ...)

Python

  1. data_stream.union(otherStream1, otherStream2, ...)

Window Join

DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

Java

  1. dataStream.join(otherStream)
  2. .where(<key selector>).equalTo(<key selector>)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  4. .apply (new JoinFunction () {...});

Scala

  1. dataStream.join(otherStream)
  2. .where(<key selector>).equalTo(<key selector>)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  4. .apply { ... }

Python

This feature is not yet supported in Python

Interval Join

KeyedStream,KeyedStream → DataStream

Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound.

Java

  1. // this will join the two streams so that
  2. // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
  3. keyedStream.intervalJoin(otherKeyedStream)
  4. .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
  5. .upperBoundExclusive(true) // optional
  6. .lowerBoundExclusive(true) // optional
  7. .process(new IntervalJoinFunction() {...});

Scala

  1. // this will join the two streams so that
  2. // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
  3. keyedStream.intervalJoin(otherKeyedStream)
  4. .between(Time.milliseconds(-2), Time.milliseconds(2))
  5. // lower and upper bound
  6. .upperBoundExclusive(true) // optional
  7. .lowerBoundExclusive(true) // optional
  8. .process(new IntervalJoinFunction() {...})

Python

This feature is not yet supported in Python

Window CoGroup

DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

Java

  1. dataStream.coGroup(otherStream)
  2. .where(0).equalTo(1)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  4. .apply (new CoGroupFunction () {...});

Scala

  1. dataStream.coGroup(otherStream)
  2. .where(0).equalTo(1)
  3. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  4. .apply {}

Python

This feature is not yet supported in Python

Connect

DataStream,DataStream → ConnectedStream

“Connects” two data streams retaining their types. Connect allowing for shared state between the two streams.

Java

  1. DataStream<Integer> someStream = //...
  2. DataStream<String> otherStream = //...
  3. ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

Scala

  1. someStream : DataStream[Int] = ...
  2. otherStream : DataStream[String] = ...
  3. val connectedStreams = someStream.connect(otherStream)

Python

  1. stream_1 = ...
  2. stream_2 = ...
  3. connected_streams = stream_1.connect(stream_2)

CoMap, CoFlatMap

ConnectedStream → DataStream

Similar to map and flatMap on a connected data stream

Java

  1. connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
  2. @Override
  3. public Boolean map1(Integer value) {
  4. return true;
  5. }
  6. @Override
  7. public Boolean map2(String value) {
  8. return false;
  9. }
  10. });
  11. connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
  12. @Override
  13. public void flatMap1(Integer value, Collector<String> out) {
  14. out.collect(value.toString());
  15. }
  16. @Override
  17. public void flatMap2(String value, Collector<String> out) {
  18. for (String word: value.split(" ")) {
  19. out.collect(word);
  20. }
  21. }
  22. });

Scala

  1. connectedStreams.map(
  2. (_ : Int) => true,
  3. (_ : String) => false
  4. )
  5. connectedStreams.flatMap(
  6. (_ : Int) => true,
  7. (_ : String) => false
  8. )

Python

  1. class MyCoMapFunction(CoMapFunction):
  2. def map1(self, value):
  3. return value[0] + 1, value[1]
  4. def map2(self, value):
  5. return value[0], value[1] + 'flink'
  6. class MyCoFlatMapFunction(CoFlatMapFunction):
  7. def flat_map1(self, value)
  8. for i in range(value[0]):
  9. yield i
  10. def flat_map2(self, value):
  11. yield value[0] + 1
  12. connectedStreams.map(MyCoMapFunction())
  13. connectedStreams.flat_map(MyCoFlatMapFunction())

Cache

DataStream → CachedDataStream

Cache the intermediate result of the transformation. Currently, only jobs that run with batch execution mode are supported. The cache intermediate result is generated lazily at the first time the intermediate result is computed so that the result can be reused by later jobs. If the cache is lost, it will be recomputed using the original transformations.

Java

  1. DataStream<Integer> dataStream = //...
  2. CachedDataStream<Integer> cachedDataStream = dataStream.cache();
  3. cachedDataStream.print(); // Do anything with the cachedDataStream
  4. ...
  5. env.execute(); // Execute and create cache.
  6. cachedDataStream.print(); // Consume cached result.
  7. env.execute();

Scala

  1. val dataStream : DataStream[Int] = //...
  2. val cachedDataStream = dataStream.cache()
  3. cachedDataStream.print() // Do anything with the cachedDataStream
  4. ...
  5. env.execute() // Execute and create cache.
  6. cachedDataStream.print() // Consume cached result.
  7. env.execute()

Python

  1. data_stream = ... # DataStream
  2. cached_data_stream = data_stream.cache()
  3. cached_data_stream.print()
  4. # ...
  5. env.execute() # Execute and create cache.
  6. cached_data_stream.print() # Consume cached result.
  7. env.execute()

Full Window Partition

DataStream → PartitionWindowedStream

Collects all records of each partition separately into a full window and processes them. The window emission will be triggered at the end of inputs. This approach is primarily applicable to batch processing scenarios. For non-keyed DataStream, a partition contains all records of a subtask. For KeyedStream, a partition contains all records of a key.

  1. DataStream<Integer> dataStream = //...
  2. PartitionWindowedStream<Integer> partitionWindowedDataStream = dataStream.fullWindowPartition();
  3. // do full window partition processing with PartitionWindowedStream
  4. DataStream<Integer> resultStream = partitionWindowedDataStream.mapPartition(
  5. new MapPartitionFunction<Integer, Integer>() {
  6. @Override
  7. public void mapPartition(
  8. Iterable<Integer> values, Collector<Integer> out) {
  9. int result = 0;
  10. for (Integer value : values) {
  11. result += value;
  12. }
  13. out.collect(result);
  14. }
  15. }
  16. );

Physical Partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.

Custom Partitioning

DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element.

Java

  1. dataStream.partitionCustom(partitioner, "someKey");
  2. dataStream.partitionCustom(partitioner, 0);

Scala

  1. dataStream.partitionCustom(partitioner, "someKey")
  2. dataStream.partitionCustom(partitioner, 0)

Python

  1. data_stream = env.from_collection(collection=[(2, 'a'), (2, 'a'), (3, 'b')])
  2. data_stream.partition_custom(lambda key, num_partition: key % partition, lambda x: x[0])

Random Partitioning

DataStream → DataStream

Partitions elements randomly according to a uniform distribution.

Java

  1. dataStream.shuffle();

Scala

  1. dataStream.shuffle()

Python

  1. data_stream.shuffle()

Rescaling

DataStream → DataStream

Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don’t want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:

Checkpoint barriers in data streams

Java

  1. dataStream.rescale();

Scala

  1. dataStream.rescale()

Python

  1. data_stream.rescale()

Broadcasting

DataStream → DataStream

Broadcasts elements to every partition.

Java

  1. dataStream.broadcast();

Scala

  1. dataStream.broadcast()

Python

  1. data_stream.broadcast()

Task Chaining and Resource Groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

Start New Chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

Java

  1. someStream.filter(...).map(...).startNewChain().map(...);

Scala

  1. someStream.filter(...).map(...).startNewChain().map(...)

Python

  1. some_stream.filter(...).map(...).start_new_chain().map(...)

Disable Chaining

Do not chain the map operator.

Java

  1. someStream.map(...).disableChaining();

Scala

  1. someStream.map(...).disableChaining()

Python

  1. some_stream.map(...).disable_chaining()

Set Slot Sharing Group

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don’t have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is “default”, operations can explicitly be put into this group by calling slotSharingGroup(“default”).

Java

  1. someStream.filter(...).slotSharingGroup("name");

Scala

  1. someStream.filter(...).slotSharingGroup("name")

Python

  1. some_stream.filter(...).slot_sharing_group("name")

Name And Description

Operators and job vertices in flink have a name and a description. Both name and description are introduction about what an operator or a job vertex is doing, but they are used differently.

The name of operator and job vertex will be used in web ui, thread name, logging, metrics, etc. The name of a job vertex is constructed based on the name of operators in it. The name needs to be as concise as possible to avoid high pressure on external systems.

The description will be used in the execution plan and displayed as the details of a job vertex in web UI. The description of a job vertex is constructed based on the description of operators in it. The description can contain detail information about operators to facilitate debugging at runtime.

Java

  1. someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1");

Scala

  1. someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1")

Python

  1. some_stream.filter(...).name("filter").set_description("x in (1, 2, 3, 4) and y > 1")

The format of description of a job vertex is a tree format string by default. Users can set pipeline.vertex-description-mode to CASCADING, if they want to set description to be the cascading format as in former versions.

Operators generated by Flink SQL will have a name consisted by type of operator and id, and a detailed description, by default. Users can set table.exec.simplify-operator-name-enabled to false, if they want to set name to be the detailed description as in former versions.

When the topology of the pipeline is complex, users can add a topological index in the name of vertex by set pipeline.vertex-name-include-index-prefix to true, so that we can easily find the vertex in the graph according to logs or metrics tags.