Flink 1.11
本节介绍如何在 Flink 中配置程序的并发执行。一个 Flink 程序由多个 Task (transformations/operators,data sources 以及 sinks)组成。一个 Task 可以分成多个并发实例来执行,每个并发实例只处理输入数据的一个子集。一个 Task 的并发实例的个数称为并发度(parallelism)。
如果你想使用保存点,也应该考虑设置最大并发度。从保存点恢复时,可以更改指定算子或整个程序的并发度,并且此配置指定了并发的上限。
1. 设置并发度
在 Flink 中一个 Task 的并发度可以指定不同级别。
1.1 算子级别
单个算子,数据源,Sink 都可以通过调用 setParallelism()
方法来指定并发度。如下代码所示指定算子级别的并发度:
Java版本:DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
Scala版本:val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")
1.2 执行环境级别
如Flink程序剖析 博文所述,Flink 程序是在执行环境的上下文中执行的。执行环境为它执行的所有算子,数据源和 Sink 提供了默认的并发度。执行环境的并发度可以通过显式配置一个算子的并发度来覆盖。
执行环境的默认并发度可以通过调用 setParallelism()
方法来指定。要为执行的所有算子,数据源和 Sink 设置并发度为3,可以按如下代码所示设置执行环境的默认并发度:
Java版本:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
Scala版本:val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
1.3 客户端级别
在向 Flink 提交作业时,可以在客户端设置并发度。客户端可以是 Java 或者 Scala 程序。Flink 的命令行接口(CLI
)就是其中一种客户端。对于 CLI 客户端,可以使用 -p
参数指定并发度。如下代码所示指定10个并发:./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Java/Scala 程序中,并发度设置如下:
Java版本:try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
Scala版本:try {
PackagedProgram program = new PackagedProgram(file, args)
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
Configuration config = new Configuration()
Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
// set the parallelism to 10 here
client.run(program, 10, true)
} catch {
case e: Exception => e.printStackTrace
}
1.4 系统级别
可以通过在 ./conf/flink-conf.yaml
中设置 parallelism.default
属性来为所有执行环境定义全系统默认并发度。详细信息请参阅配置文档。
2. 设置最大并发度
最大并发度可以在可以设置并发度的地方设置(客户端级别和系统级别除外)。你可以调用 setMaxParallelism()
方法来设置最大并发度。最大并发度的默认设置大致为:算子并发度 +(算子并发度 / 2),下限为 127,上限为 32768。
将最大并发度设置为一个非常大的数可能会对性能造成不利影响,这是因为某些状态后端必须保持内部数据结构与 KeyGroup 的数量成比例(这也是可伸缩状态的内部实现机制)。
欢迎关注我的公众号和博客:
推荐订阅: