活动时间

事件时间/处理时间/摄取时间

Flink 在流处理节目中支持不同的时间概念

  • 处理时间:处理时间是指执行相应 算子操作的机器的系统时间。

当流程序在处理时间运行时,所有基于时间的 算子操作(如时间窗口)将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定算子的所有记录。例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括在上午9:15到上午10:00之间处理的事件,下一个窗口将包括在上午10:00到11:00之间处理的事件,因此上。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如从消息队列)到记录在系统内的算子之间流动的速度的影响。和停电(预定或其他)。

  • 事件时间:事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且可以从每个记录中提取该事件时间戳。在事件时间,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。该水印机制在下面的后面部分中描述。

在一个完美的世界中,事件时间处理将产生完全一致和确定的结果,无论事件何时到达,或者它们的排序。但是,除非事件已知按顺序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。由于只能等待一段有限的时间,因此限制了确定性事件时间应用程序的可能性。

假设所有数据都已到达,事件时间 算子操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。(有关更多信息,请参阅有关迟发事件的部分。)

请注意,有时当事件时间程序实时处理实时数据时,它们将使用一些处理时间 算子操作,以确保它们及时进行。

  • 摄取时间:摄取时间是事件进入Flink的时间。在源算子处,每个记录将源的当前时间作为时间戳,并且基于时间的 算子操作(如时间窗口)引用该时间戳。

摄取时间在概念上位于事件时间处理时间之间。与处理时间相比,它稍贵一些,但可以提供更可预测的结果。因为摄取时间使用稳定的时间戳(在源处分配一次),所以对记录的不同窗口 算子操作将引用相同的时间戳,而在处理时间中,每个窗口算子可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。

事件时间相比,摄取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水印

在内部,摄取时间事件时间非常相似,但具有自动时间戳分配和自动水印生成函数。

概览 - 图1

设定时间特征

Flink DataStream程序的第一部分通常设置基本时间特性该设置定义了数据流源的行为方式(例如,它们是否将分配时间戳),以及窗口 算子操作应该使用的时间概念KeyedStream.timeWindow(Time.seconds(30))

以下示例显示了一个Flink程序,该程序在每小时时间窗口中聚合事件。窗口的行为适应时间特征。

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  6. DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
  7. stream
  8. .keyBy( (event) -> event.getUser() )
  9. .timeWindow(Time.hours(1))
  10. .reduce( (a, b) -> a.add(b) )
  11. .addSink(...);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
  7. stream
  8. .keyBy( _.getUser )
  9. .timeWindow(Time.hours(1))
  10. .reduce( (a, b) => a.add(b) )
  11. .addSink(...)

请注意,为了在事件时间运行此示例,程序需要使用直接为数据定义事件时间的源并自行发出水印,或者程序必须在源之后注入时间戳分配器和水印生成器这些函数描述了如何访问事件时间戳,以及事件流表现出的无序程度。

以下部分描述了时间戳水印背后的一般机制有关如何在Flink DataStream API中使用时间戳分配和水印生成的指南,请参阅生成时间戳/水印

活动时间和水印

注意:Flink实现了数据流模型中的许多技术。有关事件时间和水印的详细介绍,请查看以下文章。

  • 由Tyler Akidau 流处理播放101
  • 数据流模型纸支持事件时间的流处理器需要一种方法来衡量事件时间的进度。例如,当事件时间超过一小时结束时,需要通知构建每小时窗口的窗口算子,以便算子可以关闭正在进行的窗口。

事件时间可以独立于处理时间(由挂钟测量)进行。例如,在一个程序中,算子的当前事件时间可能略微落后于处理时间(考虑到接收事件的延迟),而两者都以相同的速度进行。另一方面,通过快速转发已经在Kafka主题(或另一个消息队列)中缓冲的一些历史数据,另一个流程序可以通过几周的事件时间进行,只需几秒钟的处理。


Flink中用于衡量事件时间进度的机制是水印水印作为数据流的一部分流动并带有时间戳t一个水印(T)宣布事件时间达到时间该流,这意味着应该有从该流没有更多的数据元与时间戳T” <= T(即事件与水印时间戳旧的或相等)。

下图显示了具有(逻辑)时间戳的事件流,以及内联流水印。在该示例中,事件按顺序(关于它们的时间戳),意味着水印仅是流中的周期性标记。

包含事件(按顺序)和水印的数据流

水印对于无序是至关重要的,如下所示,其中事件不按时间戳排序。通常,水印是一种声明,通过流中的该点,到达某个时间戳的所有事件都应该到达。一旦水印到达算子,算子就可以将其内部事件时钟提前到水印的值。

包含事件(乱序)和水印的数据流

请注意,事件时间由新生成的流数据元(或多个数据元)继承,这些数据元来自生成它们的事件或触发创建这些数据元的水印。

并行流中的水印

在源函数处或之后生成水印。源函数的每个并行子任务通常独立地生成其水印。这些水印定义了该特定并行源的事件时间。

当水印流过流处理节目时,它们会在他们到达的算子处推进事件时间。每当算子提前其事件时间时,它为其后继算子生成下游的新水印。

一些算子消耗多个输入流; 例如,一个union,或者跟随keyBy(…)partition(…)函数的 算子这样的算子当前事件时间是其输入流的事件时间的最小值。由于其输入流更新其事件时间,因此算子也是如此。

下图显示了流经并行流的事件和水印的示例,以及跟踪事件时间的 算子。

具有事件和水印的并行数据流和 算子

请注意,Kafka源支持每分区水印,您可以在此处详细了解

迟到数据元

某些数据元可能违反水印条件,这意味着即使在水印(t)发生之后,也会出现更多具有时间戳t'<= t的数据元实际上,在许多现实世界设置中,某些数据元可以被任意延迟,从而无法指定某个事件时间戳的所有数据元将发生的时间。此外,即使迟到有限,通常也不希望将水印延迟太多,因为它在事件时间窗的评估中引起太多延迟。

出于这个原因,流程序可能明确地期望一些后期数据元。后期数据元是在系统的事件时间时间之后到达的数据元(由水印发出信号)已经超过了后期数据元的时间戳的时间。有关如何在事件时间窗口中使用延迟数据元的更多信息,请参阅允许延迟。

空闲Sources

目前,对于纯事件时间水印生成器,如果没有要处理的数据元,则水印不能进行。这意味着在输入数据存在间隙的情况下,事件时间将不会进行,例如窗口算子将不会被触发,因此现有窗口将不能产生任何输出数据。

为了避免这种情况,可以使用定期水印分配器,它不仅基于数据元时间戳进行分配。示例解决方案可以是在不观察新事件一段时间之后切换到使用当前处理时间作为时间基础的分配器。

源可以标记为空闲使用SourceFunction.SourceContext#markAsTemporarilyIdle有关详细信息,请参阅此方法的Javadoc以及StreamStatus

调试水印

有关在运行时调试水印的信息,请参阅调试Windows和事件时间部分。

算子如何处理水印

作为一般规则,算子需要在向下游转发之前完全处理给定的水印。例如,WindowOperator将首先评估应该触发哪些窗口,并且只有在产生由水印触发的所有输出之后,水印本身才会被发送到下游。换句话说,由于出现水印而产生的所有数据元将在水印之前发出。

同样的规则适用于TwoInputStreamOperator但是,在这种情况下,算子的当前水印被定义为其两个输入的最小值。

这种行为的细节由的实现方式定义OneInputStreamOperator#processWatermarkTwoInputStreamOperator#processWatermark1TwoInputStreamOperator#processWatermark2方法。