调整检查点和大状态
此页面提供了如何配置和调整使用大状态的应用程序的指南。
概览
要使Flink应用程序可以大规模可靠运行,必须满足两个条件:
应用程序需要能够可靠地获取检查点
在发生故障后,资源需要足以赶上输入数据流
第一部分讨论如何大规模获得良好的检查点。最后一节介绍了有关计划使用多少资源的一些最佳实践。
监测状态和检查点
监视检查点行为的最简单方法是通过UI的检查点部分。检查点监视的文档显示了如何访问可用的检查点度量标准。
扩大检查点时特别感兴趣的两个数字是:
- 算子启动检查点的时间:此时间目前尚未直接公开,但对应于:
checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration
当触发检查点的时间一直非常高时,这意味着检查点障碍需要很长时间才能从源头移动到算子。这通常表明系统在恒定的背压下运行。
- 在对齐期间缓冲的数据量。对于一次性语义,Flink 在接收多个输入流的 算子处对齐流,为该对齐缓冲一些数据。理想情况下缓冲的数据量较低 - 较高的数量意味着在不同输入流的非常不同的时间接收检查点障碍。
请注意,当存在瞬态背压,数据偏斜或网络问题时,此处指示的数字偶尔会很高。但是,如果数字一直很高,则意味着Flink将许多资源放入检查点。
调整检查点
应用程序可以定期触发检查点。当检查点比检查点间隔花费更长时间时,在正在进行的检查点完成之前不会触发下一个检查点。默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。
当检查点最终频繁占用超过基准时间间隔时(例如因为状态增长超过计划,或者存储检查点的存储暂时变慢),系统会不断地检查点(一旦完成,新系统会立即启动) 。这可能意味着在检查点上经常捆绑太多资源,而且算子的进展太少。此行为对使用异步检查点状态的流应用程序的影响较小,但可能仍会对整体应用程序性能产生影响。
为防止出现这种情况,应用程序可以定义检查点之间的最短持续时间:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
此持续时间是在最新检查点结束和下一个检查点开始之间必须经过的最小时间间隔。下图说明了这对检查点的影响。
注意:可以配置应用程序(通过CheckpointConfig
)以允许多个检查点同时进行。对于Flink中具有大状态的应用程序,这通常会将太多资源绑定到检查点。当手动触发保存点时,它可能正在与正在进行的检查点同时进行。
调整网络缓冲区
在Flink 1.3之前,网络缓冲区数量的增加也导致检查点时间增加,因为保存更多的飞行数据意味着检查点障碍被延迟。从Flink 1.3开始,每个传出/传入通道使用的网络缓冲区数量是有限的,因此可以配置网络缓冲区而不会影响检查点时间(请参阅网络缓冲区配置)。
尽可能使状态检查点异步
当状态是异步SNAPSHOT时,检查点比同步SNAPSHOT状态时更好地扩展。特别是在具有多个连接,协同函数或窗口的更复杂的流应用程序中,这可能会产生深远的影响。
要异步创建状态,应用程序必须做两件事:
使用由Flink管理的状态:托管状态表示Flink提供存储状态的数据结构。目前,这是真正的被Keys化状态,这就好比接口背后抽象
ValueState
,ListState
,ReducingState
,…使用支持异步SNAPSHOT的状态后台。在Flink 1.2中,只有RocksDB状态后台使用完全异步SNAPSHOT。从Flink 1.3开始,基于堆的状态后台也支持异步SNAPSHOT。
以上两点意味着大状态通常应保持为被Keys化状态,而不是算子状态。
调整RocksDB
许多大型Flink流应用程序的状态存储主力是RocksDB State后台。后台远远超出主存储器,可靠地存储大被Keys化状态。
不幸的是,RocksDB的性能可能因配置而异,并且几乎没有关于如何正确调整RocksDB的文档。例如,默认配置是针对SSD定制的,并且在旋转磁盘上执行次优。
增量检查点
与完整检查点相比,增量检查点可以显着缩短检查点时间,但代价是(可能)更长的恢复时间。核心思想是增量检查点仅记录对先前完成的检查点的所有更改,而不是生成状态后台的完整,自包含备份。像这样,增量检查点建立在先前的检查点上。Flink以一种随时间自我整合的方式利用RocksDB的内部备份机制。因此,Flink中的增量检查点历史记录不会无限增长,并且最终会将旧检查点包含在内并自动修剪。`
虽然我们强烈建议对大型状态使用增量检查点,但请注意,这是一项新函数,目前默认情况下未启用。要启用此函数,用户可以RocksDBStateBackend
在构造函数集中使用相应的布尔标志来实例化a true
,例如:
RocksDBStateBackend backend =
new RocksDBStateBackend(filebackend, true);
RocksDB计时器
对于RocksDB,用户可以选择计时器是存储在堆上(默认)还是存储在RocksDB中。基于堆的定时器可以为较少数量的定时器提供更好的性能,而在RocksDB中存储定时器可提供更高的可扩展性,因为RocksDB中的定时器数量可能超过可用的主内存(溢出到磁盘)。
当使用RockDB作为状态后台时,可以通过Flink的配置通过选项键选择定时器存储的类型state.backend.rocksdb.timer-service.factory
。可能的选择是heap
(在堆上存储定时器,默认)和rocksdb
(在RocksDB中存储定时器)。
注意RocksDB状态后台/增量检查点/基于堆的定时器的组合当前不支持定时器状态的异步SNAPSHOT。其他状态如被Keys化状态仍然是异步SNAPSHOT。请注意,这不是以前版本的回归,将通过解决FLINK-10026
。
将选项传递给RocksDB
RocksDBStateBackend.setOptions(new MyOptions());
public class MyOptions implements OptionsFactory {
@Override
public DBOptions createDBOptions() {
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true);
}
@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(256 * 1024 * 1024) // 256 MB
.setBlockSize(128 * 1024)); // 128 KB
}
}
预定义选项
Flink为RocksDB提供了一些预定义的选项集合,用于不同的设置,例如可以设置RocksDBStateBacked.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
。
我们希望随着时间的推移积累更多这样的配置文件 当您发现一组运行良好且对某些工作负载具有代表性的选项时,请随意提供此类预定义选项配置文件。
注意 RocksDB是一个本机库,它直接从进程分配内存,而不是从JVM分配内存。您必须考虑分配给RocksDB的任何内存,通常是将TaskManagers的JVM堆大小Reduce相同的量。不这样做可能导致YARN / Mesos / etc终止JVM进程以分配比配置更多的内存。
容量规划
本节讨论如何确定应该使用多少资源来使Flink作业可靠地运行。容量规划的基本经验法则是:
正常运行应具有足够的容量,以便在恒定的背压下不运行。有关如何检查应用程序是否在背压下运行的详细信息,请参阅背压监测。
在无故障时间内无需背压即可运行程序所需的资源之上提供一些额外资源。需要这些资源来“赶上”在应用程序恢复期间累积的输入数据。这应该取决于恢复 算子操作通常需要多长时间(这取决于故障转移时需要加载到新TaskManagers中的状态的大小)以及该方案需要多快才能恢复。
重要事项:应该在激活检查点的情况下建立基线,因为检查点会占用一些资源(例如网络带宽)。
临时背压通常是可以的,并且是在负载峰值期间,追赶阶段期间或外部系统(写入水槽中)表现出暂时减速时执行流控制的重要部分。
某些 算子操作(如大窗口)会导致下游算子出现尖峰负载:对于Windows,下游算子在构建窗口时可能没什么可做的,并且在窗口发出时有负载。下游并行度的计划需要考虑窗口发射的程度以及需要处理这种尖峰的速度。
要点:为了以后允许添加资源,请确保将数据流程序的最大并行度设置为合理的数字。最大并行度定义了在重新缩放程序时(通过保存点)设置程序并行度的高度。
Flink的内部副本记录以最大并行度 - 许多关键组的粒度跟踪并行状态。即使执行低并行度的程序,Flink的设计也力求使其具有非常高的最大并行度值。
压缩
Flink为所有检查点和保存点提供可选压缩(默认:关闭)。目前,压缩总是使用snappy压缩算法(版本1.1.4),但我们计划在将来支持自定义压缩算法。压缩适用于被Keys化状态下的键组的粒度,即每个键组可以单独解压缩,这对于重新缩放很重要。
压缩可以通过以下方式激活ExecutionConfig
:
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
注意压缩选项对增量SNAPSHOT没有影响,因为它们使用的是RocksDB的内部格式,它始终使用开箱即用的快速压缩。
任务本地恢复
愿景
在Flink的检查点中,每个任务都会生成其状态的SNAPSHOT,然后将其写入分布式存储。每个任务通过发送描述分布式存储中状态位置的句柄来确认成功将状态写入JobManager。反过来,JobManager从所有任务中收集句柄并将它们捆绑到检查点对象中。
在恢复的情况下,JobManager打开最新的检查点对象并将句柄发送回相应的任务,然后可以从分布式存储中恢复其状态。使用分布式存储来存储状态有两个重要的优点。首先,存储是容错的,其次,分布式存储中的所有状态都可以被所有节点访问,并且可以容易地重新分配(例如,用于重新分级)。
但是,使用远程分布式存储也有一个很大的缺点:所有任务必须通过网络从远程位置读取其状态。在许多情况下,恢复可以将失败的任务重新安排到与上一次运行相同的TaskManager(当然还有例如机器故障),但我们仍然必须读取远程状态。即使单台机器上只有很小的故障,这也可能导致大型状态的恢复时间过长。
方法
任务本地状态恢复完全针对这个长恢复时间的问题,主要思想如下:对于每个检查点,每个任务不仅将任务状态写入分布式存储,而且还保存状态SNAPSHOT的辅助副本。任务本地的存储(例如,在本地磁盘或内存中)。请注意,SNAPSHOT的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供访问以重新分发状态,此函数仍需要主副本。
但是,对于可以重新安排到先前位置进行恢复的每个任务,我们可以从辅助本地副本恢复状态,并避免远程读取状态的成本。鉴于许多故障不是节点故障,并且节点故障通常一次只影响一个或很少的节点,很可能在恢复中大多数任务可以返回到它们先前的位置并且找到它们的本地状态。这使得本地恢复有效Reduce恢复时间。
请注意,根据所选的状态后台和检查点策略,每个检查点可能会产生一些额外费用,用于创建和存储辅助本地状态副本。例如,在大多数情况下,实现将简单地将对分布式存储的写入复制到本地文件。
主(分布式存储)和辅助(任务 - 本地)状态SNAPSHOT的关系
任务本地状态始终被视为辅助副本,检查点状态的基本事实是分布式存储中的主副本。这对于检查点和恢复期间本地状态的问题有影响:
对于检查点,主副本必须成功,并且生成辅助本地副本的失败不会使检查点失败。如果无法创建主副本,则检查点将失败,即使已成功创建辅助副本也是如此。
只有主副本由JobManager确认和管理,辅助副本由TaskManager拥有,其生命周期可以独立于其主副本。例如,可以将3个最新检查点的历史记录保存为主副本,并仅保存最新检查点的任务本地状态。
对于恢复,如果匹配的辅助副本可用,Flink将始终首先尝试从任务本地状态恢复。如果从辅助副本恢复期间出现任何问题,Flink将透明地重试从主副本恢复任务。如果主要和(可选)辅助副本失败,则恢复仅失败。在这种情况下,根据配置,Flink仍然可以回退到较旧的检查点。
任务本地副本可能仅包含完整任务状态的一部分(例如,在写入一个本地文件时出现异常)。在这种情况下,Flink将首先尝试在本地恢复本地部分,从主副本恢复非本地状态。主状态必须始终完整,并且是任务本地状态的超集。
任务本地状态可以具有与主状态不同的格式,它们不需要是字节相同的。例如,任务本地状态甚至可能是由堆对象组成的内存中,而不是存储在任何文件中。
如果TaskManager丢失,则其所有任务的本地状态将丢失。
配置任务本地恢复
默认情况下,任务本地恢复已取消激活,可以通过Flink的配置使用state.backend.local-recovery
指定的Keys激活CheckpointingOptions.LOCALRECOVERY
。此设置的值可以为true,也可以为false_(默认值)以禁用本地恢复。
有关不同状态后台的任务本地恢复的详细信息
限制:目前,任务本地恢复仅涵盖被Keys化状态后台。被Keys化状态通常是该状态最大的部分。在不久的将来,我们还将涵盖算子的状态和计时器。
以下状态后台可以支持任务本地恢复。
FsStateBackend:被Keys化状态支持任务本地恢复。实现将将状态复制到本地文件。这可能会引入额外的写入成本并占用本地磁盘空间。将来,我们还可能提供一种将任务本地状态保存在内存中的实现。
RocksDBStateBackend:被Keys化状态支持任务本地恢复。对于完整检查点,状态将复制到本地文件。这可能会引入额外的写入成本并占用本地磁盘空间。对于增量SNAPSHOT,本地状态基于RocksDB的本机检查点机制。此机制也用作创建主副本的第一步,这意味着在这种情况下,不会引入额外的成本来创建辅助副本。我们只是保存原生检查点目录,而不是在上传到分布式商店后删除它。此本地副本可以与RocksDB的工作目录共享活动文件(通过硬链接),因此对于活动文件,也不会为使用增量SNAPSHOT的任务本地恢复消耗额外的磁盘空间。
分配保存调度
任务本地恢复假设在失败时保存分配任务调度,其工作如下。每个任务都会记住其先前的分配,并请求完全相同的插槽在恢复时重新启动。如果此插槽不可用,则任务将从资源管理器请求新的新插槽。这样,如果TaskManager不再可用,则无法返回其先前位置的任务将不会驱动其先前插槽中的其他恢复任务。我们的理由是,当TaskManager不再可用时,前一个插槽只能消失,在这种情况下是一些任务必须要求新的插槽。通过我们的调度策略,我们可以为最大数量的任务提供从本地状态恢复的机会,并避免任务窃取其先前插槽之间的级联效应。
分配保存调度不适用于Flink的传统模式。