YARN快速入门
产生背景
MapReduce1.x存在的问题:单点故障&节点压力大&不易扩展
Jobtraker受内存限制,导致扩展性受限。因为其需要存储每个作业的信息。另一方面,其采用粗粒度的锁导致心跳时间边长。
中心化架构的通病,一旦Jobtraker崩溃,会导致整个集群崩溃。
以mapreduce为中心,MapReduce不支持其他的编程模型,如机器学习,图算法
tasktraker的Map 槽和Reduce槽是固定的,不是动态分配的资源。
资源利用率&运维成本
Hadoop1.x时代
Hadoop2.x时代
不同计算框架可以共享一个HDFS集群上的数据,享受整体的资源
XXX On YARN的好处:与其他计算框架共享集群资源,按资源需要分配,进而提高资源的利用率
Spark On YARN
MapReduce1.x架构图
可以看出MapReduce1.x版本时架构为Master/Slaves架构
JobTracker:负责资源管理和作业调度
TaskTracker
定期向JobTracker汇报本节点的健康状况、资源使用情况、作业执行情况;
接受来自JobTracker的命令:启动任务/杀死任务
概述
Yet Another Resource Negotiator
通用的资源管理系统
为上层应用提供了统一的资源管理和调度
架构图
YARN的基本思想是将资源管理和作业调度/监视的功能拆分为单独的守护进程,因此需要一个全局的 ResourceManager (RM)和每个应用程序的ApplicationMaster(AM),应用程序可以是单个作业也可以是一组作业。
ResourceManger
RM是一个全局的资源管理器,集群只有一个,负责整个系统的资源管理和分配,包括处理客户端请求、启动/监控APP master、监控nodemanager、资源的分配与调度。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)
整个集群同一时间提供服务的RM只有一个,负责集群资源的统一管理和调度,实际上支持HA
处理客户端的请求:提交一个作业、杀死一个作业
监控NM,一旦某个NM挂了,那么该NM上运行的任务需要告知AM来处理
ApplicationMaster
负责从调度程序中协商适当的资源容器,跟踪其状态并监视进度
每一个应用程序对应一个:MapReduce、Spark类似应用程序,负责应用程序的管理
为应用程序向RM申请资源(core、memory),分配给内部task
需要与NM通信:启动/停止task,task是运行在container,其实AM也是运行在container里面
NodeManager
每台机器的框架代理,整个集群中有多个,负责自己本身节点资源管理和使用
定时向RM汇报本节点的资源使用情况
接受并处理来自RM的各种命令:启动Container
处理来自AM的命令
负责自身单个节点的资源管理
Container
封装来CPU、Memory等 资源的一个容器
是一个任务运行环境的抽象
Client
提交作业
查看作业的运行进度
杀死作业
作业提交机制
Job对象的submit方法。
waitForCompletion方法,用于提交以前没有提交过的作业,并等待它的完成,成功返回true,失败返回false
Yarn工作机制
作业提交过程
源码解析流程
作业的初始化
设置多个reduce任务
通过-D mapreduce.job.reduces属性设置
job.setNumReduceTasks()设置
那些作业是小作业
默认情况下,小作业是少于10个mapper且只有1个reducer且输入大小小于一个HDFS块的作业(通过设置mapreducer.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces和mapreduce.job.ybertask.maxbytes来修改默认值)
启动Uber任务的具体方法是-D mapreduce.job.ubertask.enable设置为true
任务的管理
任务内存分配
任务的执行
作业的完成
作业运行失败
实际情况中,用户代码错误问题,进程崩溃,机器过载,使用Hadoop的好处之一就是它可以处理此类故障并让你能够成功完成作业。我们需要考虑如下组件的失败:job、am、nm和rm
任务运行失败
map和reduce任务错误
任务JVM会在退出之前向父am发送错误报告。错误报告最后被记入用户日志。am将此次任务尝试标记为failed,并释放容器以便资源可以为其他任务使用。
对于Streaming任务,如果Streaming进程以非零退出代码退出,则标记为fail,这种行为由stream.non.zero.exit.is.failure属性来控制
任务JVM突然退出
由于JVM软件缺陷而导致MapReduce用户代码由于特殊原因造成JVM退出,这种情况下,NM会注意到进程已经退出,并通知AM将此任务尝试标记为失败
任务挂起的处理方式
AM注意到有一段时间没有收到进度的更新,便会将任务标记为失败,在此之后,任务JVM进程将被自动杀死。任务被认为失败的超时间隔通常为10分钟,可以以作业为基础(或以集群为基础)进行设置,对应的属性为
mapreduce.task.timeout
,单位为毫秒。超时(timeout)设置为0将关闭超时判定,所以长时间运行的任务永远不会标记为失败,这种情况下,被挂起的任务永远不会释放它的容器并随时间的推移降低整个集群的效率。
AM被告知一个任务尝试失败后,将重新调度该任务的执行,AM会试图避免以前失败过的NM上重新调度该任务,此外,如果一个任务失败4次,将不会再重试。这个阈值通过
mapreduce.map.maxattempts
或mapreduce.reduce.maxattempts
来控制。不触发作业失败的情况下运行任务失败的最大百分比,针对map任务和reduce任务设置
mapreduce.map.failures.maxpercent
和mapreduce.reduce.failures.maxpercent
来设置
AM运行失败
恢复过程
轮询进度报告过程
节点管理器运行失败
NM失败次数过高
该NM将可能被拉黑,即使NM自己并没有失败过。
AM管理黑名单,对MapReduce,如果一个NM上有超过是那个任务失败,AM就尽量将任务调度到不同的节点上。
用户可以通过作业属性
mapreduce.job.maxtaskfailures.per.tracker
设置该阈值
资源管理器运行失败
RM失败是严重的问题,没有RM,作业和任务容器将无法启动。在默认的配置中,RM存在单点故障,这是由于机器失败的情况下,所有运行的任务都失败且不能被恢复。
HA方案
双机热备配置,运行一对RM,所有运行中的应用程序的信息存储在一个高可用的状态存储区中(由Zookeeper或HDFS备份),这样备份机可以恢复出失败的主RM的关键状态。NM信息没有存储在状态存储区中,因为当NM方法它们的一个心跳信息时,NM的信息就能以相当快的速度被新的RM重构。
当RM启动后,它从状态存储区中读取应用程序的信息,然后集群中运行的所有应用程序重启AM,这个行为不被计为失败的应用程序重试(所以不会计入
yarn.resourcemanager.am.max-attempts
),这是因为应用程序并不是因为程序的错误代码而失败,而是系统强行终止的。实际情况中,AM重启不是MR程序的问题,因为它们是恢复已完成的任务的工作。RM从备机到主机的切换是由故障转移控制器处理的,默认的故障转移控制器是自动工作的,使用Zookeeper的leader选举机制以保证同一时刻只有一个masterRM。不同于HDFS的高可用性的实现,故障转移控制器不必是一个独立进程,为了配置方便,默认情况下嵌入在RM中。故障转移也可以手动处理。
对应RM的故障转移,客户端和节点管理器也需要进行配置,它们以轮询方式试图链接每一个RM,直到找到MasterRM。如果MasterRM故障,再次尝试链接SlaveRM直到其变成MasterRM
资源调度器
FIFO调度器
按照达到时间排序,先到先服务
容量调度器
支持并发运行Job
FAIR调度器
按照缺额排序,缺额大者优先。
任务的执行
任务执行环境
推测执行
作业完成时间取决于最慢的任务完成时间
。Mapreduce模型会将一个Job拆分成多个Task,当一个Task运行比预期慢时,它会尽量检测,
并启动另一个相同的Task作为备份
。这就是所谓的任务的推测执行。
推测执行的前提
每个Task只有一个备份任务
当前Job已完成的Task必须不小于0.05
开启推测执行参数设置。mapred-site.xml文件中默认是打开的。
mapreduce.map.speculative
默认值:true
mapreduce.reduce.speculative
默认值: trueyarn.app.mapreduce.am.job.speculator.class
默认值:org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator
Speculator类实现推测执行策略yarn.app.mapreduce.am.job.estimator.class
默认:org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator
Speculator实例使用的TaskRuntimeEstimator的实现,提供任务运行时间的估计值。
不能启用推测执行机制的情况
任务间存在严重的负载倾斜。
特殊任务,比如任务向数据库写数据。
推测执行算法原理
关于OutputCommitters
setupJob方法
commitJob方法
setupTask方法
环境搭建
通过设置一些参数并运行ResourceManager守护进程和NodeManager守护进程,可以在YARN上以伪分布式模式运行MapReduce作业。
参数配置
配置详解
配置文档:
https://hadoop.apache.org/docs/r3.2.1/hadoop-yarn/hadoop-yarn-site/NodeManager.html
ResourceManager和NodeManager配置
参数 | 值 | 说明 |
yarn.acl.enable | true / false | 允许ACLs?默认 false. |
yarn.admin.acl | Admin ACL | 在集群上设置adminis。 ACLs are of for comma-separated-usersspacecomma-separated-groups.默认是指定值为*表示任何人。特别的是空格表示皆无权限。 |
yarn.log-aggregation-enable | false | Configuration to enable or disable log aggregation 配置是否允许日志聚合。 |
ResourceManager配置
参数 | 值 | 说明 |
yarn.resourcemanager.address | ResourceManager host:port 用于客户端任务提交. | 如果设置host:port ,将覆盖yarn.resourcemanager.hostname.host:port主机名。 |
yarn.resourcemanager.scheduler.address | ResourceManager host:port 用于应用管理者向调度程序获取资源。 | 如果设置host:port ,将覆盖yarn.resourcemanager.hostname主机名 |
yarn.resourcemanager.resource-tracker.address | ResourceManager host:port 用于NodeManagers. | 如果设置 host:port ,将覆盖yarn.resourcemanager.hostname的主机名设置。 |
yarn.resourcemanager.admin.address | ResourceManager host:port 用于管理命令。 | 如果设置 host:port ,将覆盖yarn.resourcemanager.hostname主机名的设置 |
yarn.resourcemanager.webapp.address | ResourceManager web-ui host:port. | 如果设置 host:port ,将覆盖yarn.resourcemanager.hostname主机名的设置 |
yarn.resourcemanager.hostname | ResourceManager host. | 可设置为代替所有yarn.resourcemanager* address 资源的主机单一主机名。其结果默认端口为ResourceManager组件。 |
yarn.resourcemanager.scheduler.class | ResourceManager 调度类. | Capacity调度 (推荐), Fair调度 (也推荐),或Fifo调度.使用完全限定类名,如 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler. |
yarn.scheduler.minimum-allocation-mb | 在 Resource Manager上为每个请求的容器分配的最小内存. | In MBs |
yarn.scheduler.maximum-allocation-mb | 在Resource Manager上为每个请求的容器分配的最大内存 | In MBs |
yarn.resourcemanager.nodes.include-path / yarn.resourcemanager.nodes.exclude-path | 允许/摒弃的nodeManagers列表 | 如果必要,可使用这些文件来控制允许的NodeManagers列表 |
NodeManager配置
参数 | 值 | 说明 |
yarn.nodemanager.resource.memory-mb | 用于给定的NodeManager的资源,即可用的物理内存、内存容量。 | 用于运行容器的NodeManager上的全部可用资源定义。 |
yarn.nodemanager.vmem-pmem-ratio | 任务可使用的虚拟内存的最大比率,可能超过物理内存限制。 | 每个任务的虚拟内存使用可能超过其物理内存限制。NodeManager上的任务所使用的虚拟内存总量可能会超过其物理内存使用量。 |
yarn.nodemanager.local-dirs | 写中间数据的本地文件系统以逗号分隔的路径列表。 | 多个路径帮助扩展磁盘i/o。 |
yarn.nodemanager.log-dirs | 写日志的本地文件系统以逗号分隔的路径列表 | 多个路径帮助扩展磁盘i/o。 |
yarn.nodemanager.log.retain-seconds | 10800 | 默认时间(以秒为单位)保留NodeManager上的日志文件,仅适用于禁用日志聚合的情况。 |
yarn.nodemanager.remote-app-log-dir | /logs | 在应用程序完成时移动应用程序日志的HDFS目录。需要设置相应的权限。仅适用于启用日志聚合的情况 |
yarn.nodemanager.remote-app-log-dir-suffix | logs | 后缀附加到远程日志目录。日志将聚合到${yarn.nodemanager.remote-app-log-dir}/${user}/${thisParam}仅适用于启用日志聚合的情况 |
yarn.nodemanager.aux-services | mapreduce_shuffle | Shuffle service that needs to be set for Map Reduce applications. 需要被设置Shuffle服务的Map Reduce应用程序 |
yarn.nodemanager.env-whitelist | 环境变量通过 从NodeManagers的容器继承的环境属性 | 对于mapreduce应用程序,除了默认值hadoop op_mapred_home应该被添加外。属性值还有JAVA_HOME HADOOP_COMMON_HOME、HADOOP_HDFS_HOME HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME HADOOP_MAPRED_HOME |
MapReduce作业提交至YARN运行
最后更新于