Storm 理解内部消息缓冲机制

优化 Storm 拓扑性能有助于我们理解 Storm 内部消息队列的配置和使用,在这篇文章中,我将向大家解释并说明在 Storm(0.8或0.9)版本中工作进程以及相关联的 Executor 线程是如何完成内部通信的。

1. Storm工作进程中的内部消息

当我说内部消息时,我的意思是在 Storm 工作进程内发生的消息,这只局限在同一个 Storm 节点内发生的通信。Storm 依赖于 LMAX Disruptor 支持的各种消息队列来完成此通信,LMAX Disruptor 是一个高性能的线程间消息通信库。

请注意,同一工作进程中线程间通信与工作进程间通信不同,后者通常发生在不同主机之间,因此需要通过网络传输,Storm 默认使用 ZeroMQ(在Storm 0.9 开始实验性的支持 Netty)进行通信。也就是说,当一个工作进程中的 Task 想要将数据发送到 Storm 集群另一台机器的 Task 时,需要使用 ZeroMQ/Netty 进行传输。所以有如下结论:

  • Storm 工作进程内部通信(同一 Storm 节点上的线程间):LMAX Disruptor。
  • 工作进程间通信(跨网络的节点到节点):ZeroMQ或Netty。
  • 拓扑间通信:没有内置于 Storm 中,你必须自己处理这个问题。可以使用消息传递系统,如Kafka/RabbitMQ,数据库等。

如果你不了解 Storm 的工作进程,Executor 线程和 Task 之间的差异,可以参考 理解 Storm 拓扑的并行度

在我们讨论下一节中的细节之前,让我们从下图开始。

上图说明了 Storm 工作进程内部消息队列的概述。与工作进程相关的队列以红色表示,与工作进程的 Executor 线程相关的队列以绿色表示。为了更好的可读性,上图只显示一个工作进程(通常一个 Storm 节点运行多个工作进程),同时工作进程中只显示一个 Executor 线程(通常每个工作进程通常有多个 Executor 线程)。

2. 内部实现

现在我们对 Storm 的工作进程内部消息机制有了一定了解,接下来可以深入讨论细节了。

2.1 工作进程

为了管理输入和输出消息,每个工作进程都有一个监听工作进程TCP端口的接收线程(通过 supervisor.slots.ports 配置),接收线程将输入消息发送到工作进程 Executor 线程的输入队列中。工作进程接收线程的缓冲区大小通过 topology.receiver.buffer.size 配置。同样地,每个工作进程都有一个发送线程,负责从工作进程的传输队列中读取消息,并通过网络将消息发送给下游消费者。工作进程传输队列的大小通过 topology.transfer.buffer.size 配置。

每一个工作进程都会有一个接收线程和一个发送线程。接收线程用于将网络中的输入消息发送到 Executor 线程的输入队列中,发送线程用于将传输队列的消息发送给下游消费者。

topology.receiver.buffer.size 是一次批处理的最大消息数,工作进程接收线程从网络读取消息发送到 Executor 的输入队列中。如果将此参数设置得太高可能会导致很多问题,比如心跳线程饥饿,吞吐量急剧下降等。此参数默认为8个元素,值必须为2的幂(此要求间接来自LMAX Disruptor)。

// Example: configuring via Java API
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // default is 8

请注意,topology.receiver.buffer.size 与本文中描述的其他缓冲区大小相关参数不同,它不是配置的 LMAX Disruptor 队列的大小,而是配置的一个简单 ArrayList 的大小,用于缓冲输入消息。因为在这种特定情况下,数据结构不需要与其他线程共享,即它专属于工作进程的接收线程。但是因为此缓冲区的内容用于填充 Disruptor 队列(Executor 输入队列),所以它也必须是2的幂。

使用 topology.transfer.buffer.size 配置的输出队列的每个元素实际上都是一个元组列表。不同的 Executor 发送线程批量的将输出的元组从输出队列发送到传输队列中。默认值为1024个元素。

//示例:通过Java API进行配置
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,32); //默认为1024

2.2 Executor

每个工作进程都有一个或多个 Executor 线程。每个 Executor 线程都有自己的输入队列和输出队列。如上所述,工作进程运行一个专用的接收线程将输入消息发送到工作进程的 Executor 线程的输入队列中。同样地,每个 Executor 都有一个专用的发送线程,将 Executor 的输出消息从其输出队列发送到“父”工作进程的传输队列。Executor 的输入和输出队列的大小分别通过 topology.executor.receive.buffer.sizetopology.executor.send.buffer.size 配置。

每个 Executor 线程都有一个线程来处理 Spout/Bolt 的用户逻辑(即你的应用程序代码),以及一个发送线程将消息从 Executor 的输出队列发送到工作进程的传输队列。

通过 topology.executor.receive.buffer.size 配置 Executor 输入队列的大小。队列的每个元素都是元组列表。这里,元组追加形成一个批次。默认值为1024个元素,值必须为2的幂(此要求来自LMAX Disruptor)。

conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,16384); //批处理 默认值是1024

通过 topology.executor.send.buffer.size 配置 Executor 输出队列的大小。队列的每个元素都只包含一个元组。默认值为1024个元素,值必须为2的幂(此要求来自LMAX Disruptor)。

conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,16384);

3. 配置

3.1 如何配置内部消息缓冲

上面提到的各种默认值在 conf/defaults.yaml 中定义。你可以在 Storm 集群的 conf/storm.yaml 中全局覆盖这些值。你还可以通过 Storm 的 Java API 中的 backtype.storm.Config 为每个 Storm 拓扑配置这些参数。

3.2 如何配置拓扑并行度

Storm 的消息缓冲区的正确配置与拓扑的工作负载模式以及拓扑的已配置并行度密切相关。有关后者的更多详细信息,请参考理解 Storm 拓扑的并行度

3.3 了解Storm拓扑内部在做什么

Storm UI 是检查正在运行的 Storm 拓扑的关键指标的一个很好的工具。例如,它向你展示了 Spout/Bolt 的所谓 capacity。各种指标会帮助你确定本文中描述的与缓冲区相关的配置参数的更改是否对 Storm 拓扑的性能产生正面或负面影响。有关详细信息,请参考运行在Storm集群的多节点上

除此之外,你还可以生成自己的应用程序指标,并使用 Graphite 等工具进行跟踪。有关详细信息,请参阅我的文章将Storm指标发送到Graphite以及通过RPM和Supervisord安装和运行Graphite。也许值得在 GitHub 上查看 ooyala 的 metrics_storm 项目(我还没有使用它)。

3.4 优化建议

可以看看 Nathan Marz(Storm作者)的演讲:Tuning and Productionization of Storm.

开始的时候可以试试如下参数配置,看看是否能够提升 Storm 集群的性能:

conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

原文:Understanding the Internal Message Buffers of Storm

赏几毛白!