测试

测试是每个软件开发过程中不可或缺的一部分, Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。

测试用户自定义函数

通常,我们可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

单元测试无状态、无时间限制的 UDF

例如,让我们以以下无状态的 MapFunction 为例。

Java

  1. public class IncrementMapFunction implements MapFunction<Long, Long> {
  2. @Override
  3. public Long map(Long record) throws Exception {
  4. return record + 1;
  5. }
  6. }

Scala

  1. class IncrementMapFunction extends MapFunction[Long, Long] {
  2. override def map(record: Long): Long = {
  3. record + 1
  4. }
  5. }

通过传递合适地参数并验证输出,你可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

Java

  1. public class IncrementMapFunctionTest {
  2. @Test
  3. public void testIncrement() throws Exception {
  4. // instantiate your function
  5. IncrementMapFunction incrementer = new IncrementMapFunction();
  6. // call the methods that you have implemented
  7. assertEquals(3L, incrementer.map(2L));
  8. }
  9. }

Scala

  1. class IncrementMapFunctionTest extends FlatSpec with Matchers {
  2. "IncrementMapFunction" should "increment values" in {
  3. // instantiate your function
  4. val incrementer: IncrementMapFunction = new IncrementMapFunction()
  5. // call the methods that you have implemented
  6. incremeter.map(2) should be (3)
  7. }
  8. }

类似地,对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

Java

  1. public class IncrementFlatMapFunctionTest {
  2. @Test
  3. public void testIncrement() throws Exception {
  4. // instantiate your function
  5. IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
  6. Collector<Integer> collector = mock(Collector.class);
  7. // call the methods that you have implemented
  8. incrementer.flatMap(2L, collector);
  9. //verify collector was called with the right output
  10. Mockito.verify(collector, times(1)).collect(3L);
  11. }
  12. }

Scala

  1. class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
  2. "IncrementFlatMapFunction" should "increment values" in {
  3. // instantiate your function
  4. val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()
  5. val collector = mock[Collector[Integer]]
  6. //verify collector was called with the right output
  7. (collector.collect _).expects(3)
  8. // call the methods that you have implemented
  9. flattenFunction.flatMap(2, collector)
  10. }
  11. }

对有状态或及时 UDF 和自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (f适用于两个 DataStreamConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项(测试范围)。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-test-utils_2.11</artifactId>
  4. <version>1.13.0</version>
  5. <scope>test</scope>
  6. </dependency>

Copied to clipboard!

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-runtime_2.11</artifactId>
  4. <version>1.13.0</version>
  5. <scope>test</scope>
  6. </dependency>

Copied to clipboard!

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java_2.11</artifactId>
  4. <version>1.13.0</version>
  5. <scope>test</scope>
  6. <classifer>tests</classifier>
  7. </dependency>

Copied to clipboard!

现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

Java

  1. public class StatefulFlatMapTest {
  2. private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
  3. private StatefulFlatMap statefulFlatMapFunction;
  4. @Before
  5. public void setupTestHarness() throws Exception {
  6. //instantiate user-defined function
  7. statefulFlatMapFunction = new StatefulFlatMapFunction();
  8. // wrap user defined function into a the corresponding operator
  9. testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
  10. // optionally configured the execution environment
  11. testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
  12. // open the test harness (will also call open() on RichFunctions)
  13. testHarness.open();
  14. }
  15. @Test
  16. public void testingStatefulFlatMapFunction() throws Exception {
  17. //push (timestamped) elements into the operator (and hence user defined function)
  18. testHarness.processElement(2L, 100L);
  19. //trigger event time timers by advancing the event time of the operator with a watermark
  20. testHarness.processWatermark(100L);
  21. //trigger processing time timers by advancing the processing time of the operator directly
  22. testHarness.setProcessingTime(100L);
  23. //retrieve list of emitted records for assertions
  24. assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
  25. //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
  26. //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
  27. }
  28. }

Scala

  1. class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
  2. private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
  3. private var statefulFlatMap: StatefulFlatMapFunction = null
  4. before {
  5. //instantiate user-defined function
  6. statefulFlatMap = new StatefulFlatMap
  7. // wrap user defined function into a the corresponding operator
  8. testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
  9. // optionally configured the execution environment
  10. testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
  11. // open the test harness (will also call open() on RichFunctions)
  12. testHarness.open();
  13. }
  14. "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
  15. //push (timestamped) elements into the operator (and hence user defined function)
  16. testHarness.processElement(2, 100);
  17. //trigger event time timers by advancing the event time of the operator with a watermark
  18. testHarness.processWatermark(100);
  19. //trigger proccesign time timers by advancing the processing time of the operator directly
  20. testHarness.setProcessingTime(100);
  21. //retrieve list of emitted records for assertions
  22. testHarness.getOutput should contain (3)
  23. //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
  24. //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
  25. }
  26. }

KeyedOneInputStreamOperatorTestHarnessKeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformationKeySelector 来实例化。

Java

  1. public class StatefulFlatMapFunctionTest {
  2. private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
  3. private StatefulFlatMap statefulFlatMapFunction;
  4. @Before
  5. public void setupTestHarness() throws Exception {
  6. //instantiate user-defined function
  7. statefulFlatMapFunction = new StatefulFlatMapFunction();
  8. // wrap user defined function into a the corresponding operator
  9. testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
  10. // open the test harness (will also call open() on RichFunctions)
  11. testHarness.open();
  12. }
  13. //tests
  14. }

Scala

  1. class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
  2. private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
  3. private var statefulFlatMapFunction: FlattenFunction = null
  4. before {
  5. //instantiate user-defined function
  6. statefulFlatMapFunction = new StateFulFlatMap
  7. // wrap user defined function into a the corresponding operator
  8. testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())
  9. // open the test harness (will also call open() on RichFunctions)
  10. testHarness.open();
  11. }
  12. //tests
  13. }

在 Flink 代码库里可以找到更多使用这些测试工具的示例,例如:

  • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest 是测试算子和用户自定义函数(取决于处理时间和事件时间)的一个很好的例子。
  • org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest 展示了如何使用 AbstractStreamOperatorTestHarness 测试自定义 sink。具体来说,它使用 AbstractStreamOperatorTestHarness.snapshotAbstractStreamOperatorTestHarness.initializeState 来测试它与 Flink checkpoint 机制的交互。

注意 AbstractStreamOperatorTestHarness 及其派生类目前不属于公共 API,可以进行更改。

单元测试 Process Function

考虑到它的重要性,除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。考虑以下示例:

注意 要使用此测试工具,还需要引入上一节中介绍的依赖项。

Java

  1. public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
  2. @Override
  3. public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
  4. out.collect(value);
  5. }
  6. }

Scala

  1. class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {
  2. @throws[Exception]
  3. override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
  4. out.collect(value)
  5. }
  6. }

通过传递合适的参数并验证输出,对使用 ProcessFunctionTestHarnesses 是很容易进行单元测试并验证输出。

Java

  1. public class PassThroughProcessFunctionTest {
  2. @Test
  3. public void testPassThrough() throws Exception {
  4. //instantiate user-defined function
  5. PassThroughProcessFunction processFunction = new PassThroughProcessFunction();
  6. // wrap user defined function into a the corresponding operator
  7. OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
  8. .forProcessFunction(processFunction);
  9. //push (timestamped) elements into the operator (and hence user defined function)
  10. harness.processElement(1, 10);
  11. //retrieve list of emitted records for assertions
  12. assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
  13. }
  14. }

Scala

  1. class PassThroughProcessFunctionTest extends FlatSpec with Matchers {
  2. "PassThroughProcessFunction" should "forward values" in {
  3. //instantiate user-defined function
  4. val processFunction = new PassThroughProcessFunction
  5. // wrap user defined function into a the corresponding operator
  6. val harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction)
  7. //push (timestamped) elements into the operator (and hence user defined function)
  8. harness.processElement(1, 10)
  9. //retrieve list of emitted records for assertions
  10. harness.extractOutputValues() should contain (1)
  11. }
  12. }

有关如何使用 ProcessFunctionTestHarnesses 来测试 ProcessFunction 不同风格的更多示例,, 例如 KeyedProcessFunctionKeyedCoProcessFunctionBroadcastProcessFunction等,鼓励用户自行查看 ProcessFunctionTestHarnessesTest

测试 Flink 作业

JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-test-utils_2.11</artifactId>
  4. <version>1.13.0</version>
  5. <scope>test</scope>
  6. </dependency>

Copied to clipboard!

让我们采用与前面几节相同的简单 MapFunction来做示例。

Java

  1. public class IncrementMapFunction implements MapFunction<Long, Long> {
  2. @Override
  3. public Long map(Long record) throws Exception {
  4. return record + 1;
  5. }
  6. }

Scala

  1. class IncrementMapFunction extends MapFunction[Long, Long] {
  2. override def map(record: Long): Long = {
  3. record + 1
  4. }
  5. }

现在,可以在本地 Flink 集群使用这个 MapFunction 的简单 pipeline,如下所示。

Java

  1. public class ExampleIntegrationTest {
  2. @ClassRule
  3. public static MiniClusterWithClientResource flinkCluster =
  4. new MiniClusterWithClientResource(
  5. new MiniClusterResourceConfiguration.Builder()
  6. .setNumberSlotsPerTaskManager(2)
  7. .setNumberTaskManagers(1)
  8. .build());
  9. @Test
  10. public void testIncrementPipeline() throws Exception {
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. // configure your test environment
  13. env.setParallelism(2);
  14. // values are collected in a static variable
  15. CollectSink.values.clear();
  16. // create a stream of custom elements and apply transformations
  17. env.fromElements(1L, 21L, 22L)
  18. .map(new IncrementMapFunction())
  19. .addSink(new CollectSink());
  20. // execute
  21. env.execute();
  22. // verify your results
  23. assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
  24. }
  25. // create a testing sink
  26. private static class CollectSink implements SinkFunction<Long> {
  27. // must be static
  28. public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
  29. @Override
  30. public void invoke(Long value) throws Exception {
  31. values.add(value);
  32. }
  33. }
  34. }

Scala

  1. class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {
  2. val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
  3. .setNumberSlotsPerTaskManager(1)
  4. .setNumberTaskManagers(1)
  5. .build)
  6. before {
  7. flinkCluster.before()
  8. }
  9. after {
  10. flinkCluster.after()
  11. }
  12. "IncrementFlatMapFunction pipeline" should "incrementValues" in {
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. // configure your test environment
  15. env.setParallelism(2)
  16. // values are collected in a static variable
  17. CollectSink.values.clear()
  18. // create a stream of custom elements and apply transformations
  19. env.fromElements(1, 21, 22)
  20. .map(new IncrementMapFunction())
  21. .addSink(new CollectSink())
  22. // execute
  23. env.execute()
  24. // verify your results
  25. CollectSink.values should contain allOf (2, 22, 23)
  26. }
  27. }
  28. // create a testing sink
  29. class CollectSink extends SinkFunction[Long] {
  30. override def invoke(value: Long): Unit = {
  31. CollectSink.values.add(value)
  32. }
  33. }
  34. object CollectSink {
  35. // must be static
  36. val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
  37. }

关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

  • 为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。

  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。

  • 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。

  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。

  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。

  • 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。