理解 Storm 拓扑的并行度

1. 什么让拓扑运行

Storm 区分以下 3 个主要的实体,用来运行 Storm 集群中拓扑:

  • Worker 进程
  • Executors 线程
  • Tasks

这是一个简单的例子, 以说明他们之间的关系

一个 Worker 进程用来执行一个拓扑的子集。属于一个指定拓扑的 Worker 进程, 为该拓扑的一个或多个组件(spouts 或 bolts)运行一个或多个 Executors。一个正在运行的拓扑由多个这样的进程组成, 它们运行在 Storm 集群的多个机器上。

Executor 是一个线程,由 Worker 进程产生。一个 Executor 可以为同一个组件(spout 或 blot)运行一个或多个 Tasks。

Task 执行实际的数据处理 - 在你代码中实现的 spout 或 bolt 在集群上执行尽可能多的 Task。一个组件的 Task 数目在整个拓扑生命周期中总是相同的,但是一个组件的 Executors 数目会随时间变化。这意味着以下条件成立: #threads ≤ #tasks。默认情况下,Tasks 的数目与 Executors 的数目设置成一样,即,Storm 在每个线程上运行一个 Task。

2. 配置拓扑的并行度

请注意,在 Storm 的术语中, parallelism 专门用来描述所谓的 parallelism hint,表示一个组件的 Executor 的初始化数量。在这篇文章中, 尽管我们一般使用 parallelism 术语来描述如何配置 Executor 的数目,但同时也可以配置 Worker 进程的数目和 Storm 拓扑的 Tasks 数目。

以下部分概述了各种配置参数以及如何在代码中进行设置。尽管可以有多种方法去设置这些参数,但下面只列出了其中的一些。Storm 目前配置的优先顺序为: defaults.yaml < storm.yaml < 特定拓扑的配置 < 特定内部组件的配置 < 特定外部组件的配置

2.1 Worker 进程的数量

  • 描述: 在集群的机器上为拓扑创建多少个 Worker 进程。
  • 配置参数: TOPOLOGY_WORKERS
  • 如何在代码中设置:conf.setNumWorkers(4)

2.2 Executors的数量

  • 描述: 为每个组件创建多少个 Executors。
  • 配置参数: None (传递 parallelism_hint 参数到 setSpout 或 setBolt)。
  • 如何在代码中设置: TopologyBuilder#setSpout()TopologyBuilder#setBolt()

    参数现在指定了 Bolt 的 Executors 的初始化数量(不是 Tasks)。

2.3 Tasks的数量

  • 描述: 为每个组件创建多少个 Tasks。
  • 配置参数: TOPOLOGY_TASKS
  • 如何在代码中设置:ComponentConfigurationDeclarer#setNumTasks()

3. 运行拓扑示例

Config conf = new Config();

// use two worker processes
conf.setNumWorkers(2);

// set parallelism hint to 2
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2);

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);

下图显示了上述简单拓扑是如何运行的。该拓扑由 3 个组件构成: 一个为 BlueSpout 的 spout 和两个为 GreenBoltYellowBolt 的 bolts。BlueSpout 将其输出发送到 GreenBoltGreenBolt 将其输出发送到 YellowBolt

Storm 还提供了额外的配置来设置拓扑的并行度:

  • TOPOLOGY_MAX_TASK_PARALLELISM: 此参数设置单个组件 Executor 数量的上限。通常在测试期间使用它来限制在本地模式下运行拓扑时产生的线程数。你可以通过 Config#setMaxTaskParallelism() 来设置此选项。

4. 如何改变正在运行中的拓扑的并行度

Storm 的一个很好的特性是可以增加或减少 Worker 进程 或 Executor 的数量,不需要重新启动集群拓扑。这样的行为称之为 rebalance

你有两个选项来 rebalance 一个拓扑:

  • 使用 Storm web UI 来 rebalance 指定的拓扑。
  • 使用 CLI 工具 storm rebalance, 如下所示。
    以下是一个使用 CLI 工具的示例:
    ## 重新配置拓扑 "mytopology" 使用  5 个 Worker 进程
    ## 重新配置spout "blue-spout" 使用 3 个 Executors
    ## 重新配置bolt "yellow-bolt" 使用 10 个 Executors

    $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

版本:1.2.2

原文:Understanding the Parallelism of a Storm Topology

赏几毛白!