状态运行

本文档介绍了在开发应用程序时如何使用Flink的状态抽象。

被Keys化状态和算子状态

Flink有两种基本的状态:Keyed StateOperator State

被Keys化状态

被Keys化状态始终与键相关,只能在a上的函数和 算子中使用KeyedStream

您可以将Keyed State视为已分区或分片的算子状态,每个Keys只有一个状态分区。每个被Keys化状态在逻辑上绑定到<parallel-operator-instance,key>的唯一复合,并且由于每个键“属于”一个被Keys化 算子的一个并行实例,我们可以将其简单地视为<operator,key >。

被Keys化状态进一步组织成所谓的Keys组Keys组是Flink可以重新分配Keys状态的原子单元; Keys组与定义的最大并行度完全一样多。在执行期间,被Keys化 算子的每个并行实例都使用一个或多个Keys组的Keys。

算子状态

使用算子状态或非被Keys化状态),每个算子状态都绑定到一个并行 算子实例。Kafka连接器是在Flink使用运营状况的一个很好的激励例子。Kafka使用者的每个并行实例都将主题分区和偏移的映射维护为其算子状态。

算子状态接口支持在并行性更改时在并行 算子实例之间重新分配状态。可以有不同的方案来进行此重新分配。

原始和管理状态

被Keys化状态算子状态有两种形式:托管状态原始状态

托管状态由Flink运行时控制的数据结构表示,例如内部哈希表或RocksDB。例如“ValueState”,“ListState”等.Flink的运行时对状态进行编码并将它们写入检查点。

原始状态是算子保存在自己的数据结构中的状态。检查点时,它们只会将一个字节序列写入检查点。Flink对状态的数据结构一无所知,只看到原始字节。

所有数据流函数都可以使用托管状态,但原始状态接口只能在实现 算子时使用。建议使用托管状态(而不是原始状态),因为在托管状态下,Flink能够在并行性更改时自动重新分配状态,并且还可以进行更好的内存管理。

注意如果您的托管状态需要自定义序列化逻辑,请参阅相应的指南以确保将来的兼容性。Flink的默认序列化器不需要特殊处理。

使用托管被Keys化状态

托管被Keys化状态接口提供对不同类型状态的访问,这些状态都限定为当前输入数据元的键。这意味着这种类型的状态只能用于a KeyedStream,可以通过创建stream.keyBy(…)

现在,我们将首先查看可用的不同类型的状态,然后我们将看到它们如何在程序中使用。可用的状态原语是:

  • ValueState<T>:这保存了一个可以更新和检索的值(如上所述,作用于输入数据元的键的范围,因此 算子操作看到的每个键可能有一个值)。可以使用update(T)和设置值来设置该值T value()

  • ListState<T>:这保存了数据元列表。您可以追加数据元并检索Iterable所有当前存储的数据元。使用add(T)或添加数据元addAll(List<T>),可以使用Iterable检索Iterable<T> get()。您也可以使用覆盖现有列表update(List<T>)

  • ReducingState<T>:这保存一个值,表示添加到状态的所有值的聚合。接口类似于,ListStateadd(T)使用指定的数据元将使用的数据元缩减为聚合ReduceFunction

  • AggregatingState<IN, OUT>:这保存一个值,表示添加到状态的所有值的聚合。与此相反ReducingState,聚合类型可能与添加到状态的数据元类型不同。接口与for相同,ListStateadd(IN)使用指定的聚合使用添加的数据元AggregateFunction

  • FoldingState<T, ACC>:这保存一个值,表示添加到状态的所有值的聚合。与此相反ReducingState,聚合类型可能与添加到状态的数据元类型不同。界面类似于ListState添加的数据元add(T)使用指定的折叠成聚合FoldFunction

  • MapState<UK, UV>:这将保存映射列表。您可以将键值对放入状态,并检索Iterable所有当前存储的映射。使用put(UK, UV)或添加映射putAll(Map<UK, UV>)。可以使用检索与用户Keys关联的值get(UK)。对于映射,键和值可迭代视图可以使用被检索entries()keys()values()分别。

所有类型的状态还具有clear()清除当前活动键的状态的方法,即输入数据元的键。

注意 FoldingStateFoldingStateDescriptor已在Flink 1.4中弃用,将来将被完全删除。请使用AggregatingStateAggregatingStateDescriptor不是。

重要的是要记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input数据元的键。因此,如果所涉及的Keys不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同。

要获得状态句柄,您必须创建一个StateDescriptor这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如a ReduceFunction根据要检索的状态类型,可以创建a ValueStateDescriptor,a ListStateDescriptor,a ReducingStateDescriptor,a FoldingStateDescriptor或a MapStateDescriptor

使用the访问状态RuntimeContext,因此只能在丰富的函数中使用请参阅此处了解相关信息,但我们很快也会看到一个示例。RuntimeContext是在提供RichFunction具有这些方法来访问状态:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)这是一个FlatMapFunction显示所有部件如何组合在一起的示例

  • Java

  • Scala
  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. /**
  3. * The ValueState handle. The first field is the count, the second field a running sum.
  4. */
  5. private transient ValueState<Tuple2<Long, Long>> sum;
  6. @Override
  7. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8. // access the state value
  9. Tuple2<Long, Long> currentSum = sum.value();
  10. // update the count
  11. currentSum.f0 += 1;
  12. // add the second field of the input value
  13. currentSum.f1 += input.f1;
  14. // update the state
  15. sum.update(currentSum);
  16. // if the count reaches 2, emit the average and clear the state
  17. if (currentSum.f0 >= 2) {
  18. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19. sum.clear();
  20. }
  21. }
  22. @Override
  23. public void open(Configuration config) {
  24. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25. new ValueStateDescriptor<>(
  26. "average", // the state name
  27. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28. Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29. sum = getRuntimeContext().getState(descriptor);
  30. }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34. .keyBy(0)
  35. .flatMap(new CountWindowAverage())
  36. .print();
  37. // the printed output will be (1,4) and (1,5)
  1. class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  2. private var sum: ValueState[(Long, Long)] = _
  3. override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
  4. // access the state value
  5. val tmpCurrentSum = sum.value
  6. // If it hasn't been used before, it will be null
  7. val currentSum = if (tmpCurrentSum != null) {
  8. tmpCurrentSum
  9. } else {
  10. (0L, 0L)
  11. }
  12. // update the count
  13. val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
  14. // update the state
  15. sum.update(newSum)
  16. // if the count reaches 2, emit the average and clear the state
  17. if (newSum._1 >= 2) {
  18. out.collect((input._1, newSum._2 / newSum._1))
  19. sum.clear()
  20. }
  21. }
  22. override def open(parameters: Configuration): Unit = {
  23. sum = getRuntimeContext.getState(
  24. new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
  25. )
  26. }
  27. }
  28. object ExampleCountWindowAverage extends App {
  29. val env = StreamExecutionEnvironment.getExecutionEnvironment
  30. env.fromCollection(List(
  31. (1L, 3L),
  32. (1L, 5L),
  33. (1L, 7L),
  34. (1L, 4L),
  35. (1L, 2L)
  36. )).keyBy(_._1)
  37. .flatMap(new CountWindowAverage())
  38. .print()
  39. // the printed output will be (1,4) and (1,5)
  40. env.execute("ExampleManagedState")
  41. }

这个例子实现了一个穷人的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的键1)。该函数将计数和运行总和存储在a中ValueState一旦计数达到2,它将发出平均值并清除状态,以便我们重新开始0请注意,如果我们在第一个字段中具有不同值的元组,则会为每个不同的输入键保存不同的状态值。

状态生存时间(TTL)

一个时间的生存期(TTL)可以被分配给任何类型的被Keys化状态。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。

所有状态集合类型都支持每个条目的TTL。这意味着列表数据元和映射条目将独立过期。

为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL函数:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.time.Time;
  4. StateTtlConfig ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build();
  9. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
  10. stateDescriptor.enableTimeToLive(ttlConfig);
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.api.common.time.Time
  4. val ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build
  9. val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
  10. stateDescriptor.enableTimeToLive(ttlConfig)

配置有几个选项需要考虑:

newBuilder方法的第一个参数是必需的,它是生存时间值。

更新类型配置状态TTL刷新时(默认情况下OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅限创建和写入权限
  • StateTtlConfig.UpdateType.OnReadAndWrite - 也读取访问权限状态可见性配置是否在读取访问时返回过期值(如果尚未清除NeverReturnExpired(默认情况下):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 永远不会返回过期的值

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用则返回在这种情况下NeverReturnExpired,过期状态表现得好像它不再存在,即使它仍然必须被删除。该选项对于在TTL之后必须严格读取访问数据的用例非常有用,例如应用程序使用隐私敏感数据。

另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。

笔记:

  • 状态后台存储上次修改的时间戳以及用户值,这意味着启用此函数会增加状态存储的消耗。堆状态后台存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始长值。RocksDB状态后台为每个存储值,列表条目或映射条目添加8个字节。

  • 目前仅支持参考处理时间的 TTL 。

  • 尝试恢复先前未配置TTL的状态,使用TTL启用描述符或反之亦然将导致兼容性失败和StateMigrationException

  • TTL配置不是检查点或保存点的一部分,而是Flink如何在当前运行的作业中处理它的方式。

清除过期状态

目前,只有在显式读出过期值时才会删除过期值,例如通过调用ValueState.value()

注意这意味着默认情况下,如果未读取过期状态,则不会将其删除,这可能会导致状态不断增长。这可能在将来的版本中发生变化

此外,您可以在获取完整状态SNAPSHOT时激活清理,这将减小其大小。在当前实现下不会清除本地状态,但在从上一个SNAPSHOT恢复的情况下,它不会包括已删除的过期状态。它可以配置为StateTtlConfig

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.time.Time;
  3. StateTtlConfig ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot()
  6. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.time.Time
  3. val ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot
  6. .build

此选项不适用于RocksDB状态后台中的增量检查点。

未来将添加更多策略,以便在后台自动清理过期状态。

在Scala DataStream API中声明

除了上面描述的接口之外,Scala API还具有单个on的有状态map()flatMap()函数的快捷方式用户函数获取in中的当前值,并且必须返回将用于更新状态的更新值。ValueStateKeyedStreamValueStateOption

  1. val stream: DataStream[(String, Int)] = ...
  2. val counts: DataStream[(String, Int)] = stream
  3. .keyBy(_._1)
  4. .mapWithState((in: (String, Int), count: Option[Int]) =>
  5. count match {
  6. case Some(c) => ( (in._1, c), Some(c + in._2) )
  7. case None => ( (in._1, 0), Some(in._2) )
  8. })

使用托管算子状态

要使用托管算子状态,有状态函数可以实现更通用的CheckpointedFunction接口或ListCheckpointed<T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction接口提供对具有不同重新分发方案的非被Keys化状态的访问。它需要实现两种方法:

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行检查点时,都会snapshotState()被调用。initializeState()每次初始化用户定义的函数时,都会调用对应函数,即首次初始化函数时,或者当函数实际从早期检查点恢复时。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。

目前,支持列表样式的托管算子状态。预期该状态是一个List序列化的对象,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配非被Keys化状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案:

  • 偶分裂再分配:每个 算子返回一个状态数据元列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,列表被平均分成与并行 算子一样多的子列表。每个 算子都会获得一个子列表,该子列表可以为空,也可以包含一个或多个数据元。例如,如果使用并行性1,则 算子的检查点状态包含数据元,element1并且element2当将并行性增加到2时,element1可能最终在 算子实例0中,而element2将转到 算子实例1。

  • 联合重新分配:每个 算子返回一个状态数据元列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,每个 算子都会获得完整的状态数据元列表。

下面是一个有状态的示例SinkFunction,用于CheckpointedFunction在将数据元发送到外部世界之前对其进行缓冲。它演示了基本的偶分裂再分配列表状态:

  1. public class BufferingSink
  2. implements SinkFunction<Tuple2<String, Integer>>,
  3. CheckpointedFunction {
  4. private final int threshold;
  5. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  6. private List<Tuple2<String, Integer>> bufferedElements;
  7. public BufferingSink(int threshold) {
  8. this.threshold = threshold;
  9. this.bufferedElements = new ArrayList<>();
  10. }
  11. @Override
  12. public void invoke(Tuple2<String, Integer> value) throws Exception {
  13. bufferedElements.add(value);
  14. if (bufferedElements.size() == threshold) {
  15. for (Tuple2<String, Integer> element: bufferedElements) {
  16. // send it to the sink
  17. }
  18. bufferedElements.clear();
  19. }
  20. }
  21. @Override
  22. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  23. checkpointedState.clear();
  24. for (Tuple2<String, Integer> element : bufferedElements) {
  25. checkpointedState.add(element);
  26. }
  27. }
  28. @Override
  29. public void initializeState(FunctionInitializationContext context) throws Exception {
  30. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  31. new ListStateDescriptor<>(
  32. "buffered-elements",
  33. TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  34. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  35. if (context.isRestored()) {
  36. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  37. bufferedElements.add(element);
  38. }
  39. }
  40. }
  41. }
  1. class BufferingSink(threshold: Int = 0)
  2. extends SinkFunction[(String, Int)]
  3. with CheckpointedFunction {
  4. @transient
  5. private var checkpointedState: ListState[(String, Int)] = _
  6. private val bufferedElements = ListBuffer[(String, Int)]()
  7. override def invoke(value: (String, Int)): Unit = {
  8. bufferedElements += value
  9. if (bufferedElements.size == threshold) {
  10. for (element <- bufferedElements) {
  11. // send it to the sink
  12. }
  13. bufferedElements.clear()
  14. }
  15. }
  16. override def snapshotState(context: FunctionSnapshotContext): Unit = {
  17. checkpointedState.clear()
  18. for (element <- bufferedElements) {
  19. checkpointedState.add(element)
  20. }
  21. }
  22. override def initializeState(context: FunctionInitializationContext): Unit = {
  23. val descriptor = new ListStateDescriptor[(String, Int)](
  24. "buffered-elements",
  25. TypeInformation.of(new TypeHint[(String, Int)]() {})
  26. )
  27. checkpointedState = context.getOperatorStateStore.getListState(descriptor)
  28. if(context.isRestored) {
  29. for(element <- checkpointedState.get()) {
  30. bufferedElements += element
  31. }
  32. }
  33. }
  34. }

initializeState方法作为参数a FunctionInitializationContext这用于初始化非被Keys化状态“容器”。这些是一种类型的容器,ListState其中非被Keys化状态对象将在检查点存储。

注意状态是如何初始化的,类似于被Keys化状态,其中StateDescriptor包含状态名称和有关状态所包含值的类型的信息:

  1. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  2. new ListStateDescriptor<>(
  3. "buffered-elements",
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  5. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  1. val descriptor = new ListStateDescriptor[(String, Long)](
  2. "buffered-elements",
  3. TypeInformation.of(new TypeHint[(String, Long)]() {})
  4. )
  5. checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态访问方法的命名约定包含其重新分发模式,后跟其状态结构。例如,要在还原时使用联合重新分发方案的列表状态,请使用以下方式访问状态getUnionListState(descriptor)如果方法名称不包含重新分发模式,例如 getListState(descriptor),它只是意味着将使用基本的偶分裂再分配方案。

在初始化容器之后,我们使用isRestored()上下文方法来检查我们是否在失败后恢复。如果是这样true我们正在恢复,则应用恢复逻辑。

如修改的代码所示,在状态初始化期间恢复的BufferingSink这个ListState被保存在类变量中以供将来使用snapshotState()在那里,ListState被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。

作为旁注,被Keys化状态也可以在initializeState()方法中初始化这可以使用提供的方式完成FunctionInitializationContext

ListCheckpointed

ListCheckpointed接口是比较有限的变体CheckpointedFunction,它仅支持与恢复甚至分裂的再分配方案列表式的状态。它还需要实现两种方法:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

snapshotState() 算子上应该返回检查点的对象列表,并且restoreState必须在恢复时处理这样的列表。如果状态不是重新分区,可以随时返回Collections.singletonList(MY_STATE)snapshotState()

有状态源函数

与其他算子相比,有状态的来源需要更多的关注。为了使状态和输出集合的更新成为原子(在故障/恢复时精确一次的语义所需),用户需要从源的上下文中获取锁定。

  1. public static class CounterSource
  2. extends RichParallelSourceFunction<Long>
  3. implements ListCheckpointed<Long> {
  4. /** current offset for exactly once semantics */
  5. private Long offset;
  6. /** flag for job cancellation */
  7. private volatile boolean isRunning = true;
  8. @Override
  9. public void run(SourceContext<Long> ctx) {
  10. final Object lock = ctx.getCheckpointLock();
  11. while (isRunning) {
  12. // output and state update are atomic
  13. synchronized (lock) {
  14. ctx.collect(offset);
  15. offset += 1;
  16. }
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. isRunning = false;
  22. }
  23. @Override
  24. public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
  25. return Collections.singletonList(offset);
  26. }
  27. @Override
  28. public void restoreState(List<Long> state) {
  29. for (Long s : state)
  30. offset = s;
  31. }
  32. }
  1. class CounterSource
  2. extends RichParallelSourceFunction[Long]
  3. with ListCheckpointed[Long] {
  4. @volatile
  5. private var isRunning = true
  6. private var offset = 0L
  7. override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
  8. val lock = ctx.getCheckpointLock
  9. while (isRunning) {
  10. // output and state update are atomic
  11. lock.synchronized({
  12. ctx.collect(offset)
  13. offset += 1
  14. })
  15. }
  16. }
  17. override def cancel(): Unit = isRunning = false
  18. override def restoreState(state: util.List[Long]): Unit =
  19. for (s <- state) {
  20. offset = s
  21. }
  22. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
  23. Collections.singletonList(offset)
  24. }

当Flink完全确认检查点与外界通信时,某些算子可能需要这些信息。在这种情况下,请参阅org.apache.flink.runtime.state.CheckpointListener界面。