实验特点

本节介绍DataStream API中的实验性函数。实验性函数仍在不断发展,可能不稳定,不完整,或者在未来版本中会发生重大变化。

将预分区数据流重新解释为被Key化的数据流

我们可以将预分区数据流重新解释为被Key化的数据流,以避免混乱。

警告:重新解释的数据流必须已经在预先划分的确切地以同样的方式Flink的keyBy将在洗牌WRT键组分配分区中的数据。

一个用例可能是两个作业之间的物化混乱:第一个作业执行keyBy shuffle并将每个输出实现为一个分区。第二个作业的来源是,对于每个并行实例,从第一个作业创建的相应分区中读取。现在可以将这些源重新解释为被Key化的数据流,例如应用窗口。请注意,这个技巧使第二个作业难以平行,这对于细粒度的恢复方案很有帮助。

这种重新解释函数通过DataStreamUtils以下方式公开

  1. static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
  2. DataStream<T> stream,
  3. KeySelector<T, K> keySelector,
  4. TypeInformation<K> typeInfo)

给定基本流,Keys选择器和类型信息,该方法从基本流创建Keys流。

代码示例:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<Integer> source = ...
  3. DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
  4. .timeWindow(Time.seconds(1))
  5. .reduce((a, b) -> a + b)
  6. .addSink(new DiscardingSink<>());
  7. env.execute();
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(1)
  3. val source = ...
  4. new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
  5. .timeWindow(Time.seconds(1))
  6. .reduce((a, b) => a + b)
  7. .addSink(new DiscardingSink[Int])
  8. env.execute()