并行执行

本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为并行性

如果要使用保存点,还应考虑设置最大并行度(或max parallelism)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有+Inf多个Keys组,因为这会对性能产生不利影响。

设置并行性

可以在不同级别的Flink中指定任务的并行性:

算子级别

可以通过调用其setParallelism()方法来定义单个 算子,数据源或数据接收器的并行性例如,像这样:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> text = [...]
  3. DataStream<Tuple2<String, Integer>> wordCounts = text
  4. .flatMap(new LineSplitter())
  5. .keyBy(0)
  6. .timeWindow(Time.seconds(5))
  7. .sum(1).setParallelism(5);
  8. wordCounts.print();
  9. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val text = [...]
  3. val wordCounts = text
  4. .flatMap{ _.split(" ") map { (_, 1) } }
  5. .keyBy(0)
  6. .timeWindow(Time.seconds(5))
  7. .sum(1).setParallelism(5)
  8. wordCounts.print()
  9. env.execute("Word Count Example")

运行环境级别

如此处所述 Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用setParallelism()方法来指定运行环境的默认并行性要以并行方式执行所有 算子,数据源和数据接收器,请3按如下方式设置运行环境的默认并行度:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(3);
  3. DataStream<String> text = [...]
  4. DataStream<Tuple2<String, Integer>> wordCounts = [...]
  5. wordCounts.print();
  6. env.execute("Word Count Example");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(3)
  3. val text = [...]
  4. val wordCounts = text
  5. .flatMap{ _.split(" ") map { (_, 1) } }
  6. .keyBy(0)
  7. .timeWindow(Time.seconds(5))
  8. .sum(1)
  9. wordCounts.print()
  10. env.execute("Word Count Example")

客户级别

在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。

对于CLI客户端,可以使用指定parallelism参数-p例如:

  1. ./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在Java / Scala程序中,并行性设置如下:

  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args);
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
  4. Configuration config = new Configuration();
  5. Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true);
  8. } catch (ProgramInvocationException e) {
  9. e.printStackTrace();
  10. }
  1. try {
  2. PackagedProgram program = new PackagedProgram(file, args)
  3. InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
  4. Configuration config = new Configuration()
  5. Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
  6. // set the parallelism to 10 here
  7. client.run(program, 10, true)
  8. } catch {
  9. case e: Exception => e.printStackTrace
  10. }

系统级别

可以通过设置parallelism.default属性来定义所有运行环境的系统范围默认并行度./conf/flink-conf.yaml有关详细信息,请参阅配置文档

设置最大并行度

可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用,setParallelism()你调用setMaxParallelism()设置最大并行度。

最大并行度的默认设置大致operatorParallelism + (operatorParallelism / 2)为下限127和上限32768

注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随Keys组的数量(这是可重新缓存状态的内部实现机制)进行扩展。