并行执行
本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为并行性。
如果要使用保存点,还应考虑设置最大并行度(或max parallelism
)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有+Inf
多个Keys组,因为这会对性能产生不利影响。
设置并行性
可以在不同级别的Flink中指定任务的并行性:
算子级别
可以通过调用其setParallelism()
方法来定义单个 算子,数据源或数据接收器的并行性。例如,像这样:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")
运行环境级别
如此处所述, Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。
可以通过调用setParallelism()
方法来指定运行环境的默认并行性。要以并行方式执行所有 算子,数据源和数据接收器,请3
按如下方式设置运行环境的默认并行度:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
客户级别
在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。
对于CLI客户端,可以使用指定parallelism参数-p
。例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在Java / Scala程序中,并行性设置如下:
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
try {
PackagedProgram program = new PackagedProgram(file, args)
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
Configuration config = new Configuration()
Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
// set the parallelism to 10 here
client.run(program, 10, true)
} catch {
case e: Exception => e.printStackTrace
}
系统级别
可以通过设置parallelism.default
属性来定义所有运行环境的系统范围默认并行度./conf/flink-conf.yaml
。有关详细信息,请参阅配置文档
设置最大并行度
可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用,setParallelism()
你调用setMaxParallelism()
设置最大并行度。
最大并行度的默认设置大致operatorParallelism + (operatorParallelism / 2)
为下限127
和上限32768
。
注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随Keys组的数量(这是可重新缓存状态的内部实现机制)进行扩展。