Windows

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality.

Currently, the widow operation is only supported in keyed streams

Keyed Windows

  1. stream
  2. .key_by(...)
  3. .window(...) <- required: "assigner"
  4. [.trigger(...)] <- optional: "trigger" (else default trigger)
  5. [.allowed_lateness(...)] <- optional: "lateness" (else zero)
  6. .apply/process() <- required: "function"

In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize your windowing logic in many different ways so that it best fits your needs.

Window Lifecycle

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners). For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp.

In addition, each window will have a Trigger (see Triggers) and a function (WindowFunction or ProcessWindowFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.

In the following we go into more detail for each of the components above. We start with the required parts in the above snippet (see Keyed Windows, Window Assigner, and Window Function) before moving to the optional ones.

Keyed Windows

The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window. Using the key_by(...) will split your infinite stream into logical keyed streams. If key_by(...) is not called, your stream is not keyed.

In the case of keyed streams, any attribute of your incoming events can be used as a key (more details here). Having a keyed stream will allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task.

Window Assigners

After specifying your stream is keyed, the next step is to define a window assigner. The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyed streams) call.

A WindowAssigner is responsible for assigning each incoming element to one or more windows. You can implement a custom window assigner by extending the WindowAssigner class.

Time-based windows have a start timestamp (inclusive) and an end timestamp (exclusive) that together describe the size of the window. In code, Flink uses TimeWindow when working with time-based windows which has methods for querying the start- and end-timestamp and also an additional method max_timestamp() that returns the largest allowed timestamp for a given windows.

In the following, we show how to custom a tumbling windows assigner. For details of Tumbling Windows, you can refer to the the relevant documentation.

  1. from typing import Tuple, Collection
  2. from pyflink.common.serializer import TypeSerializer
  3. from pyflink.datastream import WindowAssigner, Trigger
  4. from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
  5. class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
  6. def __init__(self, size: int, offset: int, is_event_time: bool):
  7. self._size = size
  8. self._offset = offset
  9. self._is_event_time = is_event_time
  10. def assign_windows(self,
  11. element: Tuple,
  12. timestamp: int,
  13. context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]:
  14. start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size)
  15. return [TimeWindow(start, start + self._size)]
  16. def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
  17. return EventTimeTrigger()
  18. def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
  19. return TimeWindowSerializer()
  20. def is_event_time(self) -> bool:
  21. return False

Window Functions

After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the window function, which is used to process the elements of each keyed window once the system determines that a window is ready for processing (see triggers for how Flink determines when a window is ready).

The window function can be ProcessWindowFunction or WindowFunction. They get an Iterable for all the elements contained in a window and additional meta information about the window to which the elements belong.

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. We will look at examples for each of these variants.

ProcessWindowFunction

A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context object with access to time and state information, which enables it to provide more flexibility than other window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing.

The signature of ProcessWindowFunction looks as follows:

  1. class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
  2. """
  3. Base interface for functions that are evaluated over keyed (grouped) windows using a context
  4. for retrieving extra information.
  5. """
  6. class Context(ABC, Generic[W2]):
  7. """
  8. The context holding window metadata.
  9. """
  10. @abstractmethod
  11. def window(self) -> W2:
  12. """
  13. :return: The window that is being evaluated.
  14. """
  15. pass
  16. @abstractmethod
  17. def current_processing_time(self) -> int:
  18. """
  19. :return: The current processing time.
  20. """
  21. pass
  22. @abstractmethod
  23. def current_watermark(self) -> int:
  24. """
  25. :return: The current event-time watermark.
  26. """
  27. pass
  28. @abstractmethod
  29. def window_state(self) -> KeyedStateStore:
  30. """
  31. State accessor for per-key and per-window state.
  32. .. note::
  33. If you use per-window state you have to ensure that you clean it up by implementing
  34. :func:`~ProcessWindowFunction.clear`.
  35. :return: The :class:`KeyedStateStore` used to access per-key and per-window states.
  36. """
  37. pass
  38. @abstractmethod
  39. def global_state(self) -> KeyedStateStore:
  40. """
  41. State accessor for per-key global state.
  42. """
  43. pass
  44. @abstractmethod
  45. def process(self,
  46. key: KEY,
  47. content: 'ProcessWindowFunction.Context',
  48. elements: Iterable[IN]) -> Iterable[OUT]:
  49. """
  50. Evaluates the window and outputs none or several elements.
  51. :param key: The key for which this window is evaluated.
  52. :param content: The context in which the window is being evaluated.
  53. :param elements: The elements in the window being evaluated.
  54. :return: The iterable object which produces the elements to emit.
  55. """
  56. pass
  57. @abstractmethod
  58. def clear(self, context: 'ProcessWindowFunction.Context') -> None:
  59. """
  60. Deletes any state in the :class:`Context` when the Window expires (the watermark passes its
  61. max_timestamp + allowed_lateness).
  62. :param context: The context to which the window is being evaluated.
  63. """
  64. pass

The key parameter is the key that is extracted via the KeySelector that was specified for the key_by() invocation. In case of tuple-index keys or string-field references this key type is always Tuple and you have to manually cast it to a tuple of the correct size to extract the key fields.

A ProcessWindowFunction can be defined and used like this:

  1. from typing import Tuple, Iterable
  2. from pyflink.common.typeinfo import Types
  3. from pyflink.datastream.window import TimeWindow
  4. class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str, TimeWindow]):
  5. def process(self,
  6. key: str,
  7. content: ProcessWindowFunction.Context,
  8. elements: Iterable[Tuple]) -> Iterable[tuple]:
  9. result = 0
  10. for i in elements:
  11. result += i[0]
  12. return [(key, result)]
  13. def clear(self, context: ProcessWindowFunction.Context) -> None:
  14. pass
  15. data_stream = env.from_collection([
  16. (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
  17. type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
  18. data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
  19. .window(TumblingEventWindowAssigner()) \
  20. .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))

WindowFunction

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

The signature of a WindowFunction looks as follows:

  1. class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
  2. """
  3. Base interface for functions that are evaluated over keyed (grouped) windows.
  4. """
  5. @abstractmethod
  6. def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
  7. """
  8. Evaluates the window and outputs none or several elements.
  9. :param key: The key for which this window is evaluated.
  10. :param window: The window that is being evaluated.
  11. :param inputs: The elements in the window being evaluated.
  12. """
  13. pass

It can be used like this:

  1. from typing import Tuple, Iterable
  2. from pyflink.common.typeinfo import Types
  3. from pyflink.datastream.window import TimeWindow
  4. class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
  5. def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
  6. result = 0
  7. for i in inputs:
  8. result += i[0]
  9. return [(key, result)]
  10. data_stream = env.from_collection([
  11. (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
  12. type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
  13. data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
  14. .window(TumblingEventWindowAssigner()) \
  15. .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))

Triggers

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. You can specify a custom trigger using trigger(…).

The signature of ProcessWindowFunction looks as follows:

  1. class Trigger(ABC, Generic[T, W]):
  2. """
  3. A Trigger determines when a pane of a window should be evaluated to emit the results for that
  4. part of the window.
  5. A pane is the bucket of elements that have the same key (assigned by the KeySelector) and same
  6. Window. An element can be in multiple panes if it was assigned to multiple windows by the
  7. WindowAssigner. These panes all have their own instance of the Trigger.
  8. Triggers must not maintain state internally since they can be re-created or reused for different
  9. keys. All necessary state should be persisted using the state abstraction available on the
  10. TriggerContext.
  11. When used with a MergingWindowAssigner the Trigger must return true from :func:`can_merge` and
  12. :func:`on_merge` most be properly implemented.
  13. """
  14. class TriggerContext(ABC):
  15. """
  16. A context object that is given to :class:`Trigger` methods to allow them to register timer
  17. callbacks and deal with state.
  18. """
  19. @abstractmethod
  20. def get_current_processing_time(self) -> int:
  21. """
  22. :return: The current processing time.
  23. """
  24. pass
  25. @abstractmethod
  26. def get_metric_group(self) -> MetricGroup:
  27. """
  28. Returns the metric group for this :class:`Trigger`. This is the same metric group that
  29. would be returned from
  30. :func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user function.
  31. :return: The metric group.
  32. """
  33. pass
  34. @abstractmethod
  35. def get_current_watermark(self) -> int:
  36. """
  37. :return: The current watermark time.
  38. """
  39. pass
  40. @abstractmethod
  41. def register_processing_time_timer(self, time: int) -> None:
  42. """
  43. Register a system time callback. When the current system time passes the specified time
  44. :func:`~Trigger.on_processing_time` is called with the time specified here.
  45. :param time: The time at which to invoke :func:`~Trigger.on_processing_time`.
  46. """
  47. pass
  48. @abstractmethod
  49. def register_event_time_timer(self, time: int) -> None:
  50. """
  51. Register an event-time callback. When the current watermark passes the specified time
  52. :func:`~Trigger.on_event_time` is called with the time specified here.
  53. :param time: The watermark at which to invoke :func:`~Trigger.on_event_time`.
  54. """
  55. pass
  56. @abstractmethod
  57. def delete_processing_time_timer(self, time: int) -> None:
  58. """
  59. Delete the processing time trigger for the given time.
  60. """
  61. pass
  62. @abstractmethod
  63. def delete_event_time_timer(self, time: int) -> None:
  64. """
  65. Delete the event-time trigger for the given time.
  66. """
  67. pass
  68. @abstractmethod
  69. def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State:
  70. """
  71. Retrieves a :class:`State` object that can be used to interact with fault-tolerant state
  72. that is scoped to the window and key of the current trigger invocation.
  73. :param state_descriptor: The StateDescriptor that contains the name and type of the
  74. state that is being accessed.
  75. :return: The partitioned state object.
  76. """
  77. pass
  78. class OnMergeContext(TriggerContext):
  79. """
  80. Extension of :class:`TriggerContext` that is given to :func:`~Trigger.on_merge`.
  81. """
  82. @abstractmethod
  83. def merge_partitioned_state(self, state_descriptor: StateDescriptor) -> None:
  84. pass
  85. @abstractmethod
  86. def on_element(self,
  87. element: T,
  88. timestamp: int,
  89. window: W,
  90. ctx: 'Trigger.TriggerContext') -> TriggerResult:
  91. """
  92. Called for every element that gets added to a pane. The result of this will determine
  93. whether the pane is evaluated to emit results.
  94. :param element: The element that arrived.
  95. :param timestamp: The timestamp of the element that arrived.
  96. :param window: The window to which the element is being added.
  97. :param ctx: A context object that can be used to register timer callbacks.
  98. """
  99. pass
  100. @abstractmethod
  101. def on_processing_time(self,
  102. time: int,
  103. window: W,
  104. ctx: 'Trigger.TriggerContext') -> TriggerResult:
  105. """
  106. Called when a processing-time timer that was set using the trigger context fires.
  107. :param time: The timestamp at which the timer fired.
  108. :param window: The window for which the timer fired.
  109. :param ctx: A context object that can be used to register timer callbacks.
  110. """
  111. pass
  112. @abstractmethod
  113. def on_event_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult:
  114. """
  115. Called when an event-time timer that was set using the trigger context fires.
  116. :param time: The timestamp at which the timer fired.
  117. :param window: The window for which the timer fired.
  118. :param ctx: A context object that can be used to register timer callbacks.
  119. """
  120. pass
  121. def can_merge(self) -> bool:
  122. """
  123. .. note:: If this returns true you must properly implement :func:`~Trigger.on_merge`
  124. :return: True if this trigger supports merging of trigger state and can therefore be used
  125. with a MergingWindowAssigner.
  126. """
  127. return False
  128. @abstractmethod
  129. def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
  130. """
  131. Called when several windows have been merged into one window by the :class:`WindowAssigner`.
  132. :param window: The new window that results from the merge.
  133. :param ctx: A context object that can be used to register timer callbacks and access state.
  134. """
  135. pass
  136. @abstractmethod
  137. def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
  138. """
  139. Clears any state that the trigger might still hold for the given window. This is called when
  140. a window is purged. Timers set using :func:`~TriggerContext.register_event_time_timer` and
  141. :func:`~TriggerContext.register_processing_time_timer` should be deleted here as well as
  142. state acquired using :func:`~TriggerContext.get_partitioned_state`.
  143. """
  144. pass

Two things to notice about the above methods are:

  1. The first three(on_element, on_processing_time and on_event_time) decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:
  • CONTINUE: do nothing,
  • FIRE: trigger the computation,
  • PURGE: clear the elements in the window, and
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
  1. Any of these methods can be used to register processing- or event-time timers for future actions.

Fire and Purge

Once a trigger determines that a window is ready for processing, it fires, i.e., it returns FIRE or FIRE_AND_PURGE. This is the signal for the window operator to emit the result of the current window. Given a window with a ProcessWindowFunction, all elements are passed to the ProcessWindowFunction.

When a trigger fires, it can either FIRE or FIRE_AND_PURGE. While FIRE keeps the contents of the window, FIRE_AND_PURGE removes its content. By default, the pre-implemented triggers simply FIRE without purging the window state.

You can implement a custom EventTimeTrigger as follows:

  1. from typing import Tuple
  2. from pyflink.datastream.window import TimeWindow
  3. class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
  4. def on_element(self,
  5. element: Tuple,
  6. timestamp: int,
  7. window: TimeWindow,
  8. ctx: 'Trigger.TriggerContext') -> TriggerResult:
  9. return TriggerResult.CONTINUE
  10. def on_processing_time(self,
  11. time: int,
  12. window: TimeWindow,
  13. ctx: 'Trigger.TriggerContext') -> TriggerResult:
  14. return TriggerResult.CONTINUE
  15. def on_event_time(self,
  16. time: int,
  17. window: TimeWindow,
  18. ctx: 'Trigger.TriggerContext') -> TriggerResult:
  19. if time >= window.max_timestamp():
  20. return TriggerResult.FIRE_AND_PURGE
  21. else:
  22. return TriggerResult.CONTINUE
  23. def on_merge(self,
  24. window: TimeWindow,
  25. ctx: 'Trigger.OnMergeContext') -> None:
  26. pass
  27. def clear(self,
  28. window: TimeWindow,
  29. ctx: 'Trigger.TriggerContext') -> None:
  30. pass

Allowed Lateness

When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See event time and especially late elements for a more thorough discussion of how Flink deals with event time.

By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped, and its default value is 0. Elements that arrive after the watermark has passed the end of the window but before it passes the end of the window plus the allowed lateness, are still added to the window. Depending on the trigger used, a late but not dropped element may cause the window to fire again.

In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also described in the Window Lifecycle section.

By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.

You can specify an allowed lateness like this:

  1. data_stream.key_by(<key selector>) \
  2. .window(<window assigner>) \
  3. .allowed_lateness(<time>) \
  4. .<windowed transformation>(<window function>)