测试

本页简要讨论如何在IDE或本地环境中测试Flink应用程序。

单元测试

通常,可以假设Flink在用户定义之外产生正确的结果Function因此,建议Function尽可能使用单元测试来测试包含主业务逻辑的类。

例如,如果实现以下内容ReduceFunction

  1. public class SumReduce implements ReduceFunction<Long> {
  2. @Override
  3. public Long reduce(Long value1, Long value2) throws Exception {
  4. return value1 + value2;
  5. }
  6. }
  1. class SumReduce extends ReduceFunction[Long] {
  2. override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
  3. value1 + value2
  4. }
  5. }

通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:

  1. public class SumReduceTest {
  2. @Test
  3. public void testSum() throws Exception {
  4. // instantiate your function
  5. SumReduce sumReduce = new SumReduce();
  6. // call the methods that you have implemented
  7. assertEquals(42L, sumReduce.reduce(40L, 2L));
  8. }
  9. }
  1. class SumReduceTest extends FlatSpec with Matchers {
  2. "SumReduce" should "add values" in {
  3. // instantiate your function
  4. val sumReduce: SumReduce = new SumReduce()
  5. // call the methods that you have implemented
  6. sumReduce.reduce(40L, 2L) should be (42L)
  7. }
  8. }

集成测试

为了端到端测试Flink流管道,您还可以编写针对本地Flink迷你集群执行的集成测试。

为此,添加测试依赖项flink-test-utils

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-test-utils_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

例如,如果要测试以下内容MapFunction

  1. public class MultiplyByTwo implements MapFunction<Long, Long> {
  2. @Override
  3. public Long map(Long value) throws Exception {
  4. return value * 2;
  5. }
  6. }
  1. class MultiplyByTwo extends MapFunction[Long, Long] {
  2. override def map(value: Long): Long = {
  3. value * 2
  4. }
  5. }

您可以编写以下集成测试:

  1. public class ExampleIntegrationTest extends AbstractTestBase {
  2. @Test
  3. public void testMultiply() throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // configure your test environment
  6. env.setParallelism(1);
  7. // values are collected in a static variable
  8. CollectSink.values.clear();
  9. // create a stream of custom elements and apply transformations
  10. env.fromElements(1L, 21L, 22L)
  11. .map(new MultiplyByTwo())
  12. .addSink(new CollectSink());
  13. // execute
  14. env.execute();
  15. // verify your results
  16. assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
  17. }
  18. // create a testing sink
  19. private static class CollectSink implements SinkFunction<Long> {
  20. // must be static
  21. public static final List<Long> values = new ArrayList<>();
  22. @Override
  23. public synchronized void invoke(Long value) throws Exception {
  24. values.add(value);
  25. }
  26. }
  27. }
  1. class ExampleIntegrationTest extends AbstractTestBase {
  2. @Test
  3. def testMultiply(): Unit = {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. // configure your test environment
  6. env.setParallelism(1)
  7. // values are collected in a static variable
  8. CollectSink.values.clear()
  9. // create a stream of custom elements and apply transformations
  10. env
  11. .fromElements(1L, 21L, 22L)
  12. .map(new MultiplyByTwo())
  13. .addSink(new CollectSink())
  14. // execute
  15. env.execute()
  16. // verify your results
  17. assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
  18. }
  19. }
  20. // create a testing sink
  21. class CollectSink extends SinkFunction[Long] {
  22. override def invoke(value: java.lang.Long): Unit = {
  23. synchronized {
  24. values.add(value)
  25. }
  26. }
  27. }
  28. object CollectSink {
  29. // must be static
  30. val values: List[Long] = new ArrayList()
  31. }

CollectSink此处使用静态变量in ,因为Flink在将所有 算子分布到集群之前将其序列化。通过静态变量与本地Flink迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。

测试检查点和状态处理

测试状态处理的一种方法是在集成测试中启用检查点。

您可以通过StreamExecutionEnvironment在测试中配置来完成此 算子操作:

  1. env.enableCheckpointing(500);
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
  1. env.enableCheckpointing(500)
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))

例如,向Flink应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常1000ms但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。

另一种方法是写使用Flink内部测试效用一个单元测试AbstractStreamOperatorTestHarnessflink-streaming-java模块。

对于如何做到这一点,请看看在一个例子org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest中也flink-streaming-java模块。

请注意,AbstractStreamOperatorTestHarness目前它不是公共API的一部分,可能会有所变化。