Hadoop MapReduce 2.x 工作原理

1. 概述

对于节点数超出4000的大型集群,MapReduce 1的系统开始面领着扩展性的瓶颈。在2010年雅虎的一个团队开始设计下一代MapReduce.由此,YARN(Yet Another Resource Negotiator的缩写或者为YARN Application Resource Neforiator的缩写)应运而生。

YARNJobtracker 的职能划分为多个独立的实体,从而改善了’经典的’MapReduce面临的扩展瓶颈问题。Jobtracker负责作业调度和任务进度监视、追踪任务、重启失败或过慢的任务和进行任务登记,例如维护计数器总数。

YARN将这两种角色划分为两个独立的守护进程:管理集群上资源使用的资源管理器(ResourceManager)和管理集群上运行任务生命周期的应用管理器(ApplicationMaster)。基本思路是:应用服务器与资源管理器协商集群的计算资源:容器(每个容器都有特定的内存上限),在这些容器上运行特定应用程序的进程。容器由集群节点上运行的节点管理器(NodeManager)监视,以确保应用程序使用的资源不会超过分配给它的资源。

jobtracker不同,应用的每个实例(这里指一个MapReduce作业)有一个专用的应用master(ApplicationMaster),它运行在应用的运行期间。这种方式实际上和最初的GoogleMapReduce论文里介绍的方法很相似,该论文描述了master进程如何协调在一组worker上运行的map任务和reduce任务。

如前所述,YARNMapReduce更具一般性,实际上MapReduce只是YARN应用的一种形式。有很多其他的YARN应用(例如能够在集群中的一组节点上运行脚本的分布式shell)以及其他正在开发的程序。YARN设计的精妙之处在于不同的YARN应用可以在同一个集群上共存。例如,一个MapReduce应用可以同时作为MPI应用运行,这大大提高了可管理性和集群的利用率。

此外,用户甚至有可能在同一个YARN集群上运行多个不同版本的MapReduce,这使得MapReduce升级过程更容易管理。注意,MapReduce的某些部分(比如作业历史服务器和shuffle处理器)以及YARN本身仍然需要在整个集群上升级。

YARN上的MapReduce比经典的MapReduce包括更多的实体:

  • 提交MapReduce作业的客户端
  • YARN资源管理器(ResourceManager),负责协调集群上计算资源的分配
  • YARN节点管理器(NodeManager),负责启动和监视集群中机器上的计算容器(container)
  • MapReduce应用程序master(ApplicationMaster),负责协调运行MapReduce作业的任务。它和MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理
  • 分布式文件系统(一般为HDFS),用来与其他实体见共享作业文件

作业运行过程如下图所示:

2. 作业提交

MapReduce 2 中的作业提交是使用与MapReduce 1相同的用户API(参见上图步骤1)。MapReduce 2实现了ClientProtocol,当mapreduce.framework.name设置为yarn时启动。提交的过程与经典的非常相似。从资源管理器ResourceManager(而不是jobtracker)获取新的作业ID,在YARN命名法中它是一个应用程序ID(参见上图步骤2)。作业客户端检查作业的输出说明,计算输入分片(虽然有选项yarn.app.mapreduce.am.compute-splits-in-cluster在集群上来产生分片,这可以使具有多个分片的作业从中受益)并将作业资源(包括作业JAR、配置和分片信息)复制到HDFS(参见上图步骤3)。最后,通过调用资源管理器上的submitApplication()方法提交作业(参见上图步骤4)。

3. 作业初始化

资源管理器收到调用它的submitApplication()消息后,便将请求传递给调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动应用程序的master进程(ApplicationMaster)(参见上图步骤5a和5b)。

MapReduce作业的ApplicationMaster是一个Java应用程序,它的主类是MRAppMaster。它对作业进行初始化:通过创建多个簿记对象以保持对作业进度的跟踪,因为它将接受来自任务的进度和完成报告(参见上图步骤6)。接下来,它接受来自共享文件系统的在客户端计算的输入分片(参见上图步骤7)。对每一个分片创建一个map任务对象以及由mapreduce.job.reduces属性确定的多个reduce任务对象。

接下来,ApplicationMaster决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择在与它同一个JVM上运行任务。

相对于在一个节点上顺序运行它们,判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这一情况。这不同于MapReduce 1MapReduce 1从不在单个tasktracker上运行小作业。这样的作业称为uberized,或者作为uber任务运行。

哪些任务是小任务呢? 默认情况下,小任务就是小于10个mapper且只有1个reducer且输入大小小于一个HDFS块的任务。(通过设置mapreduce.job.ubertask.maxmapsmapreduce.job.ubertask.maxreducesmapreduce.job.ubertask.maxbytes可以改变一个作业的上述值。)将mapreduce.job.ubertask.enable设置为false也可以完全使uber任务不可用。

在任何任务运行之前,作业的setup方法为了设置作业的OutputCommitter被调用来建立作业的输出目录。在MapReduce 1中,它在一个由tasktracker运行的特殊任务中被调用,而在YARN执行框架中,该方法由应用程序master直接调用。

4. 任务分配

如果作业不适合作为uber任务运行,那么ApplicationMaster就会为该作业中的所有map任务和reduce任务向资源管理器请求容器(参见上图步骤8)。附着心跳信息的请求包括每个map任务的数据本地化信息,特别是输入分片所在的主机和相应机架信息。调度器使用这些信息来做调度策略(像jobtracker的调度器一样)。理想情况下,它将任务分配到数据本地化的节点,但如果不可能这样做,调度器就会相对于非本地化的分配有限使用机架本地化的分配。

请求也为任务指定了内存需求。在默认情况下,map任务和reduce任务都分配到1024MB的内存,但这可以通过mapreduce.map.memory.mbmapreduce.reduce.memory.mb来设置。

内存的分配方式不同于MapReduce 1,后者中tasktrackers有在集群配置时设置的固定数量的槽,每个任务在一个槽上运行。槽有最大内存分配限制,这对集群是固定的,导致当任务使用较少内存时无法充分利用内存(因为其他等待的任务不能使用这些未使用的内存)以及由于任务不能获取足够内存而导致作业失败。

YARN中,资源划分更细的粒度,所以可以避免上述问题。具体而言,应用程序可以请求最小到最大限制范围内的任意最小值倍数的内存容量。默认的内存分配容量是调度器特定的,对于容量调度器,它的默认值最小值是1024MB(由 yarn.sheduler.capacity.minimum-allocation-mb设置),默认的最大值是10240MB(由yarn.sheduler.capacity.maximum-allocation-mb设置)。因此,任务可以通过适当设置mapreduce.map.memory.mbmapreduce.reduce.memory.mb来请求1GB10GB间的任务1GB倍数的内存容量(调度器在需要的时候使用最接近的倍数)。

5. 任务执行

一旦资源管理器的调度器为任务分配了容器,ApplicationMaster就通过与节点管理器通信来启动容器(参见上图步骤9a和9b)。该任务由主类YarnChildJava应用程序执行,在它运行任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR文件和所有来自分布式缓存的文件(参见上图步骤10)。最后,运行map任务或reduce任务(参见上图步骤11)。

6. 进度和状态更新

YARN下运行时,任务每三秒钟通过umbilical接口向ApplicationMaster汇报进度和状态(包含计数器),作为作业的汇聚视图(aggregate view)。相比之下,MapReduce 1通过tasktrackerjobtracker来实现进度更新。

客户端每秒钟(通过mapreduce.client.progressmonitor.pollinterval设置)查询一次ApplicationMaster以接收进度更新,通常都会向用户显示。

7. 作业完成

除了向ApplicationMaster查询进度外,客户端每5秒钟通过调用JobwaitForCompletion()来检查作业是否完成。查询的间隔可以通过mapreduce.client.completion.pollinterval属性进行设置。

作业完成后,ApplicationMaster和任务容器清理其工作状态,OutputCommitter的作业清理方法会被调用。作业历史服务器保存作业的信息供用户需要时查询。

来源于: Hadoop 权威指南

赏几毛白!