视窗
Windows是处理无限流的核心。Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在Flink中执行窗口,以及程序员如何从其提供的函数中获益最大化。
窗口Flink程序的一般结构如下所示。第一个片段指的是被Keys化流,而第二个片段指的是非被Keys化流。正如人们所看到的,唯一的区别是keyBy(…)
呼吁Keys流和window(…)
成为windowAll(…)
非被Key化的数据流。这也将作为页面其余部分的路线图。
被Keys化Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
非被Keys化Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
在上面,方括号([…])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,以便最适合您的需求。
窗口生命周期
简而言之,只要应该属于此窗口的第一个数据元到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定时,窗口将被完全删除allowed lateness
(请参阅允许的延迟))。Flink保证仅删除基于时间的窗口而不是其他类型,例如全局窗口(请参阅窗口分配器)。例如,使用基于事件时间的窗口策略,每5分钟创建一个非重叠(或翻滚)的窗口,并允许延迟1分钟,Flink将创建一个新窗口,用于间隔12:00
和12:05
当具有落入此间隔的时间戳的第一个数据元到达时,当水印通过12:06
时间戳时它将删除它。
此外,每个窗口将具有Trigger
(参见触发器)和一个函数(ProcessWindowFunction
,ReduceFunction
,AggregateFunction
或FoldFunction
)(见窗口函数)连接到它。该函数将包含要应用于窗口内容的计算,而Trigger
指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的数据元数量大于4”时,或“当水印通过窗口结束时”。触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的数据元,而不是窗口元数据。这意味着仍然可以将新数据添加到该窗口。
除了上述内容之外,您还可以指定一个Evictor
(参见Evictors),它可以在触发器触发后以及应用函数之前和/或之后从窗口中删除数据元。
在下文中,我们将详细介绍上述每个组件。在转到可选部分之前,我们从上面代码段中的必需部分开始(请参阅被Keys化与非被Keys化窗口,窗口分配器和窗口函数)。
被Keys化与非被Keys化Windows
要指定的第一件事是您的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the keyBy(…)
将您的无限流分成逻辑被Key化的数据流。如果keyBy(…)
未调用,则表示您的流不是被Keys化的。
对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。
在非被Key化的数据流的情况下,您的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1。
窗口分配器
指定您的流是否已键入后,下一步是定义一个窗口分配器。窗口分配器定义如何将数据元分配给窗口。这是通过WindowAssigner
在window(…)
(对于被Keys化流)或windowAll()
(对于非被Keys化流)调用中指定您的选择来完成的。
A WindowAssigner
负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例,即翻滚窗口,滑动窗口,会话窗口和全局窗口。您还可以通过扩展WindowAssigner
类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。请查看我们关于活动时间的部分,了解处理时间和事件时间之间的差异以及时间戳和水印的生成方式。
基于时间的窗口具有开始时间戳(包括)和结束时间戳(不包括),它们一起描述窗口的大小。在代码中,Flink在使用TimeWindow
基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法maxTimestamp()
,以及返回给定窗口的最大允许时间戳的附加方法。
在下文中,我们将展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配者的工作情况。紫色圆圈表示流的数据元,这些数据元由某个键(在这种情况下是用户1,用户2和用户3)划分。x轴显示时间的进度。
翻滚的Windows
一个翻滚窗口分配器的每个数据元分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。
以下代码段显示了如何使用翻滚窗口。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
时间间隔可以通过使用一个指定Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
,等等。
如上一个示例所示,翻滚窗口分配器还采用可选offset
参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时翻滚窗口划时代对齐,这是你会得到如窗口1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999
等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)
。
滑动窗口
该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。
例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。
以下代码段显示了如何使用滑动窗口。
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
时间间隔可以通过使用一个指定Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
,等等。
如上一个示例所示,滑动窗口分配器还采用可选offset
参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时窗口半小时滑动与时代一致,那就是你会得到如窗口1:00:00.000 - 1:59:59.999
,1:30:00.000 - 2:29:59.999
等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿1:15:00.000 - 2:14:59.999
,1:45:00.000 - 2:44:59.999
等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)
。
会话窗口
在会话窗口中按活动会话分配器组中的数据元。与翻滚窗口和滑动窗口相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或会话间隙提取器函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。
以下代码段显示了如何使用会话窗口。
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
静态间隙可以通过使用中的一个来指定Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
,等。
通过实现SessionWindowTimeGapExtractor
接口指定动态间隙。
注意由于会话窗口没有固定的开始和结束,因此它们的评估方式与翻滚和滑动窗口不同。在内部,会话窗口算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。为了可合并的,会话窗口 算子操作者需要一个合并触发器和一个合并的窗函数,如ReduceFunction
,AggregateFunction
,或ProcessWindowFunction
(FoldFunction
不能合并。)
全局Windows
一个全局性的窗口分配器分配使用相同的Keys相同的单个的所有数据元全局窗口。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。
以下代码段显示了如何使用全局窗口。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
窗口函数
定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元(请参阅Flink如何确定窗口何时准备好的触发器)。
的窗函数可以是一个ReduceFunction
,AggregateFunction
,FoldFunction
或ProcessWindowFunction
。前两个可以更有效地执行(参见State Size部分),因为Flink可以在每个窗口到达时递增地聚合它们的数据元。A ProcessWindowFunction
获取Iterable
窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。
具有a的窗口转换ProcessWindowFunction
不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有数据元。这可以通过组合来减轻ProcessWindowFunction
与ReduceFunction
,AggregateFunction
或FoldFunction
以获得两个窗口元件的增量聚合并且该附加元数据窗口ProcessWindowFunction
接收。我们将查看每个变体的示例。
ReduceFunction
A ReduceFunction
指定如何组合输入中的两个数据元以生成相同类型的输出数据元。Flink使用a ReduceFunction
来递增地聚合窗口的数据元。
A ReduceFunction
可以像这样定义和使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上面的示例总结了窗口中所有数据元的元组的第二个字段。
聚合函数
An AggregateFunction
是一个通用版本,ReduceFunction
它有三种类型:输入类型(IN
),累加器类型(ACC
)和输出类型(OUT
)。输入类型是输入流中数据元的类型,并且AggregateFunction
具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT
从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。
与之相同ReduceFunction
,Flink将在窗口到达时递增地聚合窗口的输入数据元。
一个AggregateFunction
可以被定义并这样使用:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
上面的示例计算窗口中数据元的第二个字段的平均值。
FoldFunction
A FoldFunction
指定窗口的输入数据元如何与输出类型的数据元组合。所述FoldFunction
递增称为该被添加到窗口和电流输出值的每个数据元。第一个数据元与输出类型的预定义初始值组合。
A FoldFunction
可以像这样定义和使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
上面的示例将所有输入Long
值附加到最初为空String
。
注意 fold()
不能与会话窗口或其他可合并窗口一起使用。
ProcessWindowFunction
ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。
ProcessWindowFunction
外观签名如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
注意该key
参数是通过KeySelector
为keyBy()
调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple
,您必须手动将其转换为正确大小的元组以提取键字段。
A ProcessWindowFunction
可以像这样定义和使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
该示例显示了ProcessWindowFunction
对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。
注意注意,使用ProcessWindowFunction
简单的聚合(例如count)是非常低效的。下一节将介绍a ReduceFunction
或如何AggregateFunction
与a结合使用ProcessWindowFunction
以获得增量聚合和a的附加信息ProcessWindowFunction
。
ProcessWindowFunction with Incremental Aggregation
当a 数据元到达窗口时,A ProcessWindowFunction
可以与a ReduceFunction
,an AggregateFunction
或a组合FoldFunction
以递增地聚合数据元。当窗口关闭时,ProcessWindowFunction
将提供聚合结果。这允许它在访问附加窗口元信息的同时递增地计算窗口ProcessWindowFunction
。
注意您也可以使用旧版WindowFunction
而不是ProcessWindowFunction
增量窗口聚合。
使用ReduceFunction增量窗口聚合
以下示例显示如何将增量ReduceFunction
与a组合ProcessWindowFunction
以返回窗口中的最小事件以及窗口的开始时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
使用AggregateFunction进行增量窗口聚合
以下示例显示如何将增量AggregateFunction
与a组合ProcessWindowFunction
以计算平均值,并同时发出键和窗口以及平均值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
使用FoldFunction进行增量窗口聚合
以下示例显示如何将增量FoldFunction
与a组合ProcessWindowFunction
以提取窗口中的事件数,并返回窗口的键和结束时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
在ProcessWindowFunction中使用每窗口状态
除了访问被Keys化状态(如任何丰富的函数可以),a ProcessWindowFunction
还可以使用被Keys化状态,该被Keys化状态的作用域是函数当前正在处理的窗口。在这种情况下,了解每个窗口状态所指的窗口是很重要的。涉及不同的“窗口”:
- 指定窗口 算子操作时定义的窗口:这可能是1小时的翻滚窗口或滑动1小时的2小时滑动窗口。
- 给定键的已定义窗口的实际实例:对于user-id xyz,这可能是从12:00到13:00的时间窗口。这基于窗口定义,并且将基于作业当前正在处理的键的数量以及基于事件落入的时隙而存在许多窗口。每窗口状态与后两者相关联。这意味着如果我们处理1000个不同键的事件,并且所有这些事件的事件当前都落入[12:00,13:00]时间窗口,那么将有1000个窗口实例,每个窗口实例都有自己的按键每窗口状态。
调用接收的Context
对象有两种方法process()
允许访问两种类型的状态:
globalState()
,允许访问没有作用于窗口的被Keys化状态windowState()
,允许访问也限定在窗口范围内的被Keys化状态如果您预计同一窗口会多次触发,则此函数非常有用,如果您对迟到的数据进行后期触发或者您有自定义触发器进行推测性早期触发时可能会发生这种情况。在这种情况下,您将存储有关先前点火的信息或每个窗口状态的点火次数。
使用窗口状态时,清除窗口时清除该状态也很重要。这应该在clear()
方法中发生。
WindowFunction(留存)
在一些ProcessWindowFunction
可以使用的地方你也可以使用WindowFunction
。这是较旧版本ProcessWindowFunction
,提供较少的上下文信息,并且没有一些高级函数,例如每窗口被Keys化状态。此接口将在某个时候弃用。
WindowFunction
外观的签名如下:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
它可以像这样使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
触发器
A Trigger
确定何时窗口函数准备好处理窗口(由窗口分配器形成)。每个都有默认值。如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssigner
Trigger
trigger(…)
触发器界面有五种方法可以Trigger
对不同的事件做出反应:
onElement()
为添加到窗口的每个数据元调用该方法。onEventTime()
在注册的事件时间计时器触发时调用该方法。onProcessingTime()
在注册的处理时间计时器触发时调用该方法。- 该
onMerge()
方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。 - 最后,该
clear()
方法在移除相应窗口时执行所需的任何动作。关于上述方法需要注意两点:
1)前三个决定如何通过返回a来对其调用事件进行 算子操作TriggerResult
。该 算子操作可以是以下之一:
CONTINUE
: 没做什么,FIRE
:触发计算,PURGE
:清除窗口中的数据元,和FIRE_AND_PURGE
:触发计算并清除窗口中的数据元。2)这些方法中的任何一种都可用于注册处理或事件时间计时器以用于将来的 算子操作。
火与清除
一旦触发器确定窗口已准备好进行处理,它就会触发,即它返回FIRE
或FIRE_AND_PURGE
。这是窗口算子发出当前窗口结果的信号。给定一个窗口,将ProcessWindowFunction
所有数据元传递给ProcessWindowFunction
(可能在将它们传递给逐出器后)。窗口ReduceFunction
,AggregateFunction
或FoldFunction
简单地发出他们急切地汇总结果。
当触发器触发时,它可以FIRE
或者FIRE_AND_PURGE
。虽然FIRE
保持了窗口的内容,FIRE_AND_PURGE
删除其内容。默认情况下,预先实现的触发器只是FIRE
没有清除窗口状态。
注意清除将简单地删除窗口的内容,并将保存有关窗口和任何触发状态的任何潜在元信息。
WindowAssigners的默认触发器
默认Trigger
的WindowAssigner
是适用于许多使用情况。例如,所有事件时间窗口分配器都具有EventTimeTrigger
默认触发器。一旦水印通过窗口的末端,该触发器就会触发。
注意默认触发器GlobalWindow
是NeverTrigger
从不触发的。因此,在使用时必须定义自定义触发器GlobalWindow
。
注意通过使用trigger()
您指定触发器会覆盖a的默认触发器WindowAssigner
。例如,如果指定aCountTrigger
,TumblingEventTimeWindows
则不再根据时间进度获取窗口,而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。
内置和自定义触发器
Flink附带了一些内置触发器。
- (已经提到的)
EventTimeTrigger
基于水印测量的事件时间的进展而触发。 - 在
ProcessingTimeTrigger
基于处理时间的火灾。 CountTrigger
一旦窗口中的数据元数量超过给定限制,就会触发。- 将
PurgingTrigger
另一个触发器作为参数作为参数并将其转换为清除触发器。如果需要实现自定义触发器,则应该检查抽象Trigger类。请注意,API仍在不断发展,可能会在Flink的未来版本中发生变化。
逐出器
Flink的窗口模型允许指定Evictor
除了WindowAssigner
和之外的可选项Trigger
。这可以使用evictor(…)
方法(在本文档的开头显示)来完成。所述逐出器必须从一个窗口中删除数据元的能力之后触发器触发和之前和/或之后被施加的窗口函数。为此,该Evictor
接口有两种方法:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
在evictBefore()
包含窗口函数之前被施加驱逐逻辑,而evictAfter()
包含窗口函数之后要施加的一个。在应用窗口函数之前被逐出的数据元将不会被处理。
Flink带有三个预先实施的驱逐者。这些是:
CountEvictor
:保持窗口中用户指定数量的数据元,并从窗口缓冲区的开头丢弃剩余的数据元。DeltaEvictor
:取aDeltaFunction
和athreshold
,计算窗口缓冲区中最后一个数据元与其余每个数据元之间的差值,并删除delta大于或等于阈值的值。TimeEvictor
:以interval
毫秒为单位作为参数,对于给定窗口,它查找max_ts
其数据元中的最大时间戳,并删除时间戳小于的所有数据元max_ts - interval
。默认默认情况下,所有预先实现的驱逐程序在窗口函数之前应用它们的逻辑。
注意指定逐出器会阻止任何预聚合,因为在应用计算之前,必须将窗口的所有数据元传递给逐出器。
注意 Flink不保证窗口中数据元的顺序。这意味着尽管逐出器可以从窗口的开头移除数据元,但这些数据元不一定是首先或最后到达的数据元。
允许迟到
当使用事件时间窗口时,可能会发生数据元迟到的情况,即 Flink用于跟踪事件时间进度的水印已经超过数据元所属的窗口的结束时间戳。查看事件时间,特别是后期数据元,以便更全面地讨论Flink如何处理事件时间。
默认情况下,当水印超过窗口末尾时,会删除延迟数据元。但是,Flink允许为窗口 算子指定最大允许延迟。允许延迟指定数据元在被删除之前可以延迟多少时间,并且其默认值为0.在水印通过窗口结束之后但在通过窗口结束加上允许的延迟之前到达的数据元,仍然添加到窗口中。根据使用的触发器,延迟但未丢弃的数据元可能会导致窗口再次触发。就是这种情况EventTimeTrigger
。
为了使这项工作,Flink保持窗口的状态,直到他们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,如“ 窗口生命周期”部分中所述。
默认默认情况下,允许的延迟设置为0
。也就是说,到达水印后面的数据元将被丢弃。
你可以像这样指定一个允许的迟到:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
注意使用GlobalWindows
窗口分配器时,由于全局窗口的结束时间戳为,因此没有数据被认为是迟到的Long.MAX_VALUE
。
将后期数据作为副输出
使用Flink的旁路输出函数,您可以获得最近丢弃的数据流。
首先需要指定您希望sideOutputLateData(OutputTag)
在窗口流上使用延迟数据。然后,您可以在窗口 算子操作的结果上获取旁路输出流:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
迟到数据元考虑因素
当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。在这些情况下,当迟到但未掉落的数据元到达时,它可能触发窗口的另一次触发。这些射击被称为late firings
,因为它们是由迟到事件触发的,与之相反的main firing
是窗口的第一次射击。在会话窗口的情况下,后期点火可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。
注意您应该知道,后期触发发出的数据元应该被视为先前计算的更新结果,即,您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对其进行重复数据删除。
使用窗口结果
窗口 算子操作的结果也是a DataStream
,没有关于窗口 算子操作的信息保存在结果数据元中,所以如果你想保存关于窗口的元信息,你必须在你的结果数据元中手动编码该信息ProcessWindowFunction
。在结果数据元上设置的唯一相关信息是数据元时间戳。这被设置为已处理窗口的最大允许时间戳,即结束时间戳-1,因为窗口结束时间戳是独占的。请注意,事件时间窗口和处理时间窗口都是如此。即,在窗口化 算子操作数据元之后始终具有时间戳,但这可以是事件时间时间戳或处理时间时间戳。对于处理时间窗口,这没有特别的含义,但对于事件时间窗口,这与水印与窗口交互的方式一起启用具有相同窗口大小的连续窗口 算子操作。在看了水印如何与窗口交互后,我们将介绍这一点。
水印和窗口的互动
在继续本节之前,您可能需要查看有关事件时间和水印的部分。
当水印到达窗口 算子时,会触发两件事:
- 水印触发所有窗口的计算,其中最大时间戳(即结束时间戳-1)小于新水印
- 水印被转发(按原样)到下游 算子操作直观地,水印“冲出”任何窗口,一旦接收到该水印,将在下游 算子操作中被认为是迟到。
连续窗口 算子操作
如前所述,计算窗口结果的时间戳的方式以及水印与窗口交互的方式允许将连续的窗口 算子操作串联在一起。当您想要执行两个连续的窗口 算子操作时,这可能很有用,您希望使用不同的键,但仍希望来自同一上游窗口的数据元最终位于同一下游窗口中。考虑这个例子:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
在该示例中,[0, 5)
来自第一 算子操作的时间窗口的结果也将[0, 5)
在随后的窗口化 算子操作中的时间窗口中结束。这允许计算每个键的Sum然后在第二个 算子操作中计算同一窗口内的前k个数据元。
有用的状态规模考虑因素
Windows可以在很长一段时间内(例如几天,几周或几个月)定义,因此可以累积非常大的状态。在估算窗口计算的存储要求时,需要记住几条规则:
Flink为每个窗口创建一个每个数据元的副本。鉴于此,翻滚窗口保存每个数据元的一个副本(一个数据元恰好属于一个窗口,除非它被延迟)。相反,滑动窗口会创建每个数据元的几个,如“ 窗口分配器”部分中所述。因此,尺寸为1天且滑动1秒的滑动窗口可能不是一个好主意。
ReduceFunction
,AggregateFunction
并且FoldFunction
可以显着降低存储要求,因为它们急切地聚合数据元并且每个窗口只存储一个值。相反,仅使用aProcessWindowFunction
需要累积所有数据元。使用an
Evictor
可以防止任何预聚合,因为在应用计算之前,窗口的所有数据元都必须通过逐出器传递(参见Evictors)。