流处理概念
Flink的 Table API和SQL支持是用于批处理和流处理的统一API。这意味着 Table API和SQL查询具有相同的语义,无论它们的输入是有界批量输入还是无界流输入。因为关系代数和SQL最初是为批处理而设计的,所以关于无界流输入的关系查询不像有界批输入上的关系查询那样容易理解。
在此页面上,我们将解释Flink关于流数据的关系API的概念,实际限制和特定于流的配置参数。
数据流上的关系查询
SQL和关系代数的设计并未考虑流数据。因此,关系代数(和SQL)与流处理之间的概念差距很小。
关系代数/ SQL | 流处理 |
---|---|
关系(或表)是有界(多)元组的集合。 | 流是无限的元组序列。 |
对批处理数据执行的查询(例如,关系数据库中的表)可以访问完整的输入数据。 | 流式查询在启动时无法访问所有数据,必须“等待”流式传输数据。 |
批处理查询在生成固定大小的结果后终止。 | 流式查询会根据收到的记录不断更新其结果,并且永远不会完成。 |
尽管存在这些差异,但使用关系查询和SQL处理流并非不可能。高级关系数据库系统提供称为物化视图的函数。物化视图定义为SQL查询,就像常规虚拟视图一样。与虚拟视图相比,物化视图缓存查询的结果,使得在访问视图时不需要评估查询。缓存的一个常见挑战是阻止缓存提供过时的结果。实例化视图在修改其定义查询的基表时会过时。Eager View Maintenance是一种更新实体化视图并在其基表更新后立即更新实体化视图的技术。
如果我们考虑以下内容,急切的视图维护和流上的SQL查询之间的联系就变得很明显:
- 数据库表是一个结果流的
INSERT
,UPDATE
和DELETE
DML语句,通常被称为更新日志流。 - 物化视图定义为SQL查询。为了更新视图,查询将持续处理视图基本关系的更改日志流。
- 物化视图是流式SQL查询的结果。考虑到这些要点,我们将在下一节介绍Flink的动态表概念。
动态表和连续查询
动态表是Flink的 Table API和SQL支持流数据的核心概念。与表示批处理数据的静态表相比,动态表随时间而变化。可以像静态批处理表一样查询它们。查询动态表会产生连续查询。连续查询永远不会终止并生成动态表作为结果。查询不断更新其(动态)结果表以反映其输入(动态)表的更改。实质上,对动态表的连续查询与物化视图的定义查询非常相似。
值得注意的是,连续查询的结果始终在语义上等同于在输入表的SNAPSHOT上以批处理模式执行的相同查询的结果。
下图显示了流,动态表和连续查询的关系:
### 在流上定义表为了使用关系查询处理流,必须将其转换为
[ user: VARCHAR, // the name of the user cTime: TIMESTAMP, // the time when the URL was accessed url: VARCHAR // the URL that was accessed by the user]
Table
。从概念上讲,流的每个记录都被解释为INSERT
对结果表的修改。基本上,我们正在从一个INSERT
-only changelog流构建一个表。下图显示了click事件流(左侧)如何转换为表(右侧)。随着更多点击流记录的插入,生成的表不断增长。clicks
将在对单击事件流定义的表上显示两个示例查询。第一个查询是一个简单的GROUP-BY COUNT
聚合查询。这组clicks
对表user
字段和计数访问的网址的数量。下图显示了在clicks
使用其他行更新表时,如何评估查询。clicks
表(左侧)为空。当第一行插入表中时,查询开始计算结果clicks
表。[Mary, ./home]
Insert第一行后,结果表(右侧,顶部)由一行组成[Mary, 1]
。当第二行[Bob, ./cart]
Insertclicks
表中时,查询将更新结果表并插入新行[Bob, 1]
。第三行[Mary, ./prod?id=1]
产生已更新的已计算结果行的[Mary, 1]
更新[Mary, 2]
。最后,[Liz, 1]
当第四行附加到clicks
表时,查询将第三行插入到结果表中。第二个查询类似于第一个查询,但clicks
除了user
属性之外还在每小时滚动窗口上对表进行分组,然后计算URL的数量(基于时间的计算,例如窗口基于特殊时间属性,这将在下面讨论) )。同样,该图显示了不同时间点的输入和输出,以显示动态表的变化性质。clicks
显示在左侧。查询每小时连续计算结果并更新结果表。点击表包含四行,时间戳(cTime
)位于12:00:00
和之间12:59:59
。查询从此输入计算两个结果行(每个一行user
)并将它们附加到结果表。对于13:00:00
和之间的下一个窗口13:59:59
,该clicks
表包含三行,这导致另外两行被追加到结果表中。结果表已更新,clicks
随着时间的推移会附加更多行。#### 更新并附加查询尽管两个示例查询看起来非常相似(都计算了分组计数聚合),但它们在一个重要方面有所不同:- 第一个查询更新以前发出的结果,即定义结果表的更改日志流包含INSERT
和UPDATE
更改。- 第二个查询仅附加到结果表,即结果表的更改日志流仅包含INSERT
更改。查询是生成仅附加表还是更新表有一些含义:- 产生更新更改的查询通常必须保持更多状态(请参阅下一节)。- 将仅附加表转换为流与更新表的转换不同(请参阅表到流转换部分)。#### 查询限制可以将许多(但不是全部)语义上有效的查询评估为对流的连续查询。有些查询的计算成本太高,或者是由于需要维护的状态大小,或者计算更新太昂贵。- 状态大小:连续查询在无界流上进行评估,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前发出的结果的查询需要维护所有发出的行,以便能够更新它们。例如,第一个示例查询需要存储每个用户的URL计数,以便能够增加计数,并在输入表收到新行时发送新结果。如果仅跟踪注册用户,则要维护的计数可能不会太高。但是,如果未注册的用户分配了唯一的用户名,则要维护的计数数将随着时间的推移而增长,并最终可能导致查询失败。- 计算更新:即使只添加或更新了单个输入记录,某些查询也需要重新计算和更新大部分发出的结果行。显然,这样的查询不适合作为连续查询执行。一个示例是以下查询,该查询
SELECT user, COUNT(url)FROM clicksGROUP BY user;
RANK
基于最后一次点击的时间计算每个用户a 。一旦clicks
表格收到新行,lastAction
就会更新用户,并且必须计算新的排名。但是,由于两行不能具有相同的等级,因此所有排名较低的行也需要更新。所述QueryConfig节讨论的参数来控制的连续查询的执行。一些参数可用于交换维持状态的大小以获得结果准确性。### 表到流转换动态表可以通过不断修改
SELECT user, RANK() OVER (ORDER BY lastLogin)FROM ( SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user);
INSERT
,UPDATE
以及DELETE
变化就像一个普通的数据库表。它可能是一个包含单行的表,它不断更新,只有一个插入表,no / notUPDATE
和DELETE
修改,或者介于两者之间。将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API和SQL支持三种编码动态表更改的方法:-仅附加流:只能通过INSERT
更改修改的动态表可以通过发出插入的行转换为流。-**撤消流:**撤消流是具有两种类型的消息的流,添加消息和撤消消息。通过将INSERT
更改编码为添加消息,将DELETE
更改编码为收回消息,将UPDATE
更改编码为更新(先前)行的收回消息和更新(新)行的添加消息,将动态表转换为收回流。下图显示了动态表到回收流的转换。- Upsert流: upsert流是一种包含两种消息,upsert消息和删除消息的流。转换为upsert流的动态表需要(可能是复合的)唯一键。具有唯一键的动态表通过编码转换为动态表,
INSERT
并UPDATE
更改为upsert消息并DELETE
更改为删除消息。流消耗 算子需要知道唯一键属性才能正确应用消息。与收回流的主要区别在于,UPDATE
使用单个消息对更改进行编码,因此更有效。下图显示了动态表到upsert流的转换。
DataStream
在Common Concepts页面上讨论了将动态表转换为a的API 。请注意,将动态表格转换为a时,仅支持附加和撤消流DataStream
。TableSources和TableSinks页面TableSink
讨论了向外部系统发出动态表的接口。
时间属性
Flink能够根据不同的时间概念处理流数据。
- 处理时间是指执行相应 算子操作的机器的系统时间(也称为“挂钟时间”)。
- 事件时间是指基于附加到每一行的时间戳来处理流数据。时间戳可以在事件发生时进行编码。
- 摄取时间是事件进入Flink的时间; 在内部,它与事件时间类似。有关Flink中时间处理的更多信息,请参阅有关事件时间和水印的简介。
表程序要求为流环境指定相应的时间特性:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Table API和SQL中基于时间的 算子操作(如窗口)需要有关时间概念及其来源的信息。因此,表可以提供逻辑时间属性,用于指示时间和访问表程序中的相应时间戳。
时间属性可以是每个表模式的一部分。它们是在从a创建表时定义的,DataStream
或在使用时预定义的TableSource
。一旦在开头定义了时间属性,它就可以作为字段引用,并可以用于基于时间的 算子操作。
只要时间属性未被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可以访问以进行计算。如果在计算中使用时间属性,则它将具体化并成为常规时间戳。常规时间戳不与Flink的时间和水印系统配合,因此不能再用于基于时间的 算子操作。
处理时间
处理时间允许表程序根据本地机器的时间产生结果。这是最简单的时间概念,但不提供决定论。它既不需要时间戳提取也不需要水印生成。
有两种方法可以定义处理时间属性。
在DataStream到表转换期间
处理时间属性是.proctime
在架构定义期间使用属性定义的。time属性必须仅通过附加逻辑字段扩展物理模式。因此,它只能在模式定义的末尾定义。
DataStream<Tuple2<String, String>> stream = ...;
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
val stream: DataStream[(String, String)] = ...
// declare an additional logical field as a processing time attribute
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource
处理时间属性由TableSource
实现DefinedProctimeAttribute
接口的a定义。逻辑时间属性附加到由返回类型定义的物理模式TableSource
。
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"Username" , "Data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream<Row> stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// field with this name will be appended as a third field
return "UserActionTime";
}
}
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// define a table source with a processing attribute
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
override def getReturnType = {
val names = Array[String]("Username" , "Data")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
Types.ROW(names, types)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// create stream
val stream = ...
stream
}
override def getProctimeAttribute = {
// field with this name will be appended as a third field
"UserActionTime"
}
}
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource)
val windowedTable = tEnv
.scan("UserActions")
.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
活动时间
事件时间允许表程序根据每个记录中包含的时间生成结果。即使在无序事件或延迟事件的情况下,这也允许一致的结果。当从持久存储中读取记录时,它还确保表程序的可重放结果。
此外,事件时间允许批处理和流式环境中的表程序的统一语法。流式传输环境中的时间属性可以是批处理环境中的记录的常规字段。
为了处理乱序事件并区分流处理中的准时和迟到事件,Flink需要从事件中提取时间戳并在时间上做出某种进展(所谓的水印)。
可以在DataStream-to-Table转换期间或使用TableSource定义事件时间属性。
在DataStream到表转换期间
.rowtime
在架构定义期间使用属性定义事件时间属性。必须在转换后分配时间戳和水印DataStream
。
将a转换DataStream
为a 时,有两种定义时间属性的方法Table
。根据指定的.rowtime
字段名称是否存在于模式中DataStream
,时间戳字段也是
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
// Usage:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
// Usage:
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource
事件时间属性由TableSource
实现DefinedRowtimeAttribute
接口的a定义。该getRowtimeAttribute()
方法返回一个现有字段的名称,该字段包含表的事件时间属性,并且是类型LONG
或TIMESTAMP
。
此外,方法DataStream
返回的getDataStream()
必须分配与定义的时间属性对齐的水印。请注意,忽略(DataStream
由a分配的TimestampAssigner
)时间戳。只有TableSource
's rowtime属性的值是相关的。
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"Username", "Data", "UserActionTime"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public String getRowtimeAttribute() {
// Mark the "UserActionTime" attribute as event-time attribute.
return "UserActionTime";
}
}
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
override def getReturnType = {
val names = Array[String]("Username" , "Data", "UserActionTime")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
Types.ROW(names, types)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
val stream = inputStream.assignTimestampsAndWatermarks(...)
stream
}
override def getRowtimeAttribute = {
// Mark the "UserActionTime" attribute as event-time attribute.
"UserActionTime"
}
}
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)
val windowedTable = tEnv
.scan("UserActions")
.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
查询配置
Table API和SQL查询具有相同的语义,无论它们的输入是有界批量输入还是无界流输入。在许多情况下,对流输入的连续查询能够计算与离线计算结果相同的准确结果。然而,这在一般情况下是不可能的,因为连续查询必须限制它们维护的状态的大小,以避免耗尽存储并且能够在很长一段时间内处理无界流数据。因此,连续查询可能只能提供近似结果,具体取决于输入数据和查询本身的特征。
Flink的 Table API和SQL接口提供参数来调整连续查询的准确性和资源消耗。参数通过QueryConfig
对象指定。在QueryConfig
可以从获得TableEnvironment
和被传递回一个时Table
被转换,即,当它被变换成数据流或经由TableSink发射。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// emit result Table via a TableSink
result.writeToSink(sink, qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// obtain query configuration from TableEnvironment
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
// define query
val result: Table = ???
// create TableSink
val sink: TableSink[Row] = ???
// emit result Table via a TableSink
result.writeToSink(sink, qConfig)
// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
在下文中,我们将描述它们的参数QueryConfig
以及它们如何影响查询的准确性和资源消耗。
空闲状态保存时间
许多查询在一个或多个键属性上聚合或连接记录。当在流上执行此类查询时,连续查询需要收集记录或维护每个键的部分结果。如果输入流的关键域正在发展,即活动键值随时间变化,则随着观察到越来越多的不同键,连续查询累积越来越多的状态。但是,经常在一段时间后Keys变为非活动状态,并且它们的相应状态变得陈旧且无用。
例如,以下查询计算每个会话的单击次数。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
该sessionId
属性用作分组键,连续查询维护sessionId
其观察的每个键的计数。该sessionId
属性随着时间的推移而发展,并且sessionId
值仅在会话结束之前有效,即,在有限的时间段内。但是,连续查询无法知道此属性,sessionId
并期望每个sessionId
值都可以在任何时间点发生。它维护每个观察sessionId
值的计数。因此,随着sessionId
观察到越来越多的值,查询的总状态大小不断增长。
在空闲状态保持时间参数定义多久一个键的状态得以保持,而它被删除之前被更新。对于前面的示例查询,sessionId
只要在配置的时间段内没有更新,就会删除a的计数。
通过删除键的状态,连续查询完全忘记它之前已经看过这个键。如果处理了具有其状态已被删除的Keys的记录,则该记录将被视为具有相应Keys的第一个记录。对于上面的例子,这意味着a的计数sessionId
将再次开始0
。
配置空闲状态保存时间有两个参数:
StreamQueryConfig qConfig = ...
// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
val qConfig: StreamQueryConfig = ???
// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
清理状态需要额外副本记录,而且记录使得minTime
和maxTime
不是那么浪费时间和空间。minTime
和之间的差异maxTime
必须至少为5分钟。