Yarn作业提交流程
这里以hadoop命令启动一个MR任务为例,如上图所示:
- 首先会找ResourceManager去申请一个Appilication,而ResourceManager会返回一个Appilication的资源缓存路径,专门存放启动MR任务的各类信息(切片、任务参数、jar包等)
- 各类资源提交完毕后,ResourceManager会创建一个Task,放到一个队列中(先进先出)。这是因为可能有很多节点也都想启动MR任务,所以把Task放到队列中,根据调度策略来正式启动任务
- 现在正式开启一个Task,ResourceManager会选出一个NodeManager去创建MRApplicationMaster,当然它也创建在Container中(Container是个封装了CPU、内存等资源的环境,当然是逻辑上的环境)。MRApplicationMaster其实就是对后续整个任务的进度监控和任务分配者,ResourceManager只是管理任务去调度执行,具体监督它不做的。
- MRApplicationMaster会去获取MR任务的各类信息(存在Appilication的资源缓存路径),然后找ResourceManager去获取哪些节点可以用来进行MapTask。然后MRApplicationMaster就找ResourceManager分配的NodeManager去创建Container然后发送jar包等信息开始map。
- 而这些进行MapTask的NodeManager的进行情况由MRApplicationMaster进行监控。但map阶段完成后,也是由MRApplicationMaster询问ResourceManager去获取哪些节点可以用来进行ReduceTask。就像进行MapTask一样,而MRApplicationMaster也会告知这些ReduceTask从哪里拉取map结束后的文件。
- 最后MR阶段都做完了,这些进行MapTask和ReduceTask会向ResourceManager注销自己
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
Yarn常用命令
查看任务
# 列出所有MR任务
yarn application -list
# 过滤并列出所有MR任务
# <state>可选:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
yarn application -list -appStates <state>
# 关闭MR任务
yarn application -kill <application_id>
查看日志
# 查询某个任务的日志
yarn logs -applicationId <application_id>
# 查询Container的日志,因为一个任务可能有多个容器在运行map和reduce
yarn logs -applicationId <application_id> -containerId <container_id>
查看正在运行的任务
# 列出所有正在运行任务的列表
yarn applicationattempt -list <application_id>
# 查看正在运行任务的状态
yarn applicationattempt -status <applicationattempt_id>
查看容器
# 列出所有容器
yarn container -list <applicationattempt_id>
# 打印Container状态,只有在任务跑的途中才能看到Container的状态,任务执行完容器都被释放了
yarn container -status <container_id>
查看节点状态
# 列出所有节点
yarn node -list -all
更新配置
# 更新队列配置,不需要重启Yarn
yarn rmadmin -refreshQueues
查看队列
打印队列信息
# yarn queue -status <queue_name>
配置多队列的容量调度器
创建一个新的队列Hive
需求
需求1:
default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。
需求2:
配置队列优先级
capacity-scheduler.xml
为配置文件/opt/module/hadoop-3.3.4/etc/hadoopcapacity-scheduler.xml添加如下配置
<!-- 指定多队列,增加hive队列 -->
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,hive</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>40</value>
</property>
<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>60</value>
</property>
<!-- 指定hive队列的资源额定容量 -->
<property>
<name>yarn.scheduler.capacity.root.hive.capacity</name>
<value>60</value>
</property>
<!-- 用户最多可以使用队列多少资源,1表示完全使用 -->
<property>
<name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
<value>1</value>
</property>
<!-- 指定hive队列的资源最大容量 -->
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
<value>80</value>
</property>
<!-- 启动hive队列 -->
<property>
<name>yarn.scheduler.capacity.root.hive.state</name>
<value>RUNNING</value>
</property>
<!-- 哪些用户有权向队列提交作业,*表示所有用户 -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
<value>*</value>
</property>
<!-- 哪些用户有队列的管理员权限(查看/杀死),*表示所有用户 -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
<value>*</value>
</property>
<!-- 哪些用户有权配置提交任务优先级,*表示所有用户 -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
<value>*</value>
</property>
<!-- 任务的超时时间设置:yarn application -appId appId -updateLifetime Timeout -->
<!-- 如果application指定了超时时间,则提交到该队列的application能够指定的最大超时时间不能超过该值。-1就是不使用超时
-->
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
<value>-1</value>
</property>
<!-- 如果application没指定超时时间,则用default-application-lifetime作为默认值。1就是不使用超时 -->
<property>
<name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
<value>-1</value>
</property>
给所有节点分发该配置文件,即之前的三节点集群(hadoop100,hadoop101,hadoop102)
刷新队列配置
yarn rmadmin -refreshQueues
向Hive队列提交任务
命令行
以Hadoop自带WordCount的案例为例
# -D表示运行时改变参数值hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar wordcount -D mapreduce.job.queuename=hive /txt /output
JavaAPI
如果是使用JavaAPI,那么最终的文件一定是打成Jar上传集群运行,而不是在本地运行(上述配置的队列就在集群环境)。
仅仅需要修改一下Driver,这里以《MapReduce源码解析和基本操作》的WordCount案例为例。其他都没变,就是Configuration对象加了个配置。
package ltd.cmjava.yarn.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @ClassName WordCountDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {
public static void main(String[] args) {
try {
// 获取配置信息以及获取job对象
// job就是对MR任务的封装
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename","hive");
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出路径
// 命令行会传两个参数
// hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
测试运行
然后如之前所述,打包上传,命令行调用jar,就会提交到hive队列中
hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.wordcount.WordCountDriver /txt /output
任务优先级配置
yarn-site.xml
默认情况,Yarn将所有任务的优先级限制为0,若想使用任务的优先级功能,须开放该限制。对/opt/module/hadoop-3.3.4/etc/hadoop/yarn-site.xml增加如下配置,即5种优先级
<property>
<name>yarn.cluster.max-application-priority</name>
<value>5</value>
</property>
分发配置并重启集群
模拟资源紧张环境
模拟资源紧张环境,可连续提交以下任务,直到新提交的任务申请不到资源为止
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi 5 100
提交优先级高的任务
这个任务就会有最高优先级,进行任务抢占
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi -D mapreduce.job.priority=5 5 100
也可以通过以下命令修改正在执行的任务的优先级
yarn application -appID <application_id> -updatePriority <优先级>
配置公平调度器
需求
创建两个队列,分别是test和cm(以用户所属组命名)。
期望实现以下效果:
若用户提交任务时指定队列,则任务提交到指定队列运行;若未指定队列,test用户提交的任务到root.group.test队列运行,atguigu提交的任务到root.group.atguigu队列运行(注:group为用户所属组)。
配置文件
公平调度器的配置涉及到两个文件,一个是yarn-site.xml,另一个是公平调度器队列分配文件fair-scheduler.xml(文件名可自定义)。
(1)配置文件参考资料:
https://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
(2)任务队列放置规则参考资料:
https://blog.cloudera.com/untangling-apache-hadoop-yarn-part-4-fair-scheduler-queue-basics/
yarn-site.xml
编辑/opt/module/hadoop-3.3.4/etc/hadoop/yarn-site.xml,添加如下配置:
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<description>配置使用公平调度器</description>
</property>
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/opt/module/hadoop-3.3.4/etc/hadoop/fair-scheduler.xml</value>
<description>指明公平调度器队列分配配置文件</description>
</property>
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>false</value>
<description>禁止队列间资源抢占</description>
</property>
fair-scheduler.xml
新建/opt/module/hadoop-3.3.4/etc/hadoop/fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
<!-- 单个队列中Application Master占用资源的最大比例,取值0-1 ,企业一般配置0.1 -->
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<!-- 单个队列最大资源的默认值 test cm default -->
<queueMaxResourcesDefault>2048mb,2vcores</queueMaxResourcesDefault>
<!-- 增加一个队列test -->
<queue name="test">
<!-- 队列最小资源 -->
<minResources>2048mb,2vcores</minResources>
<!-- 队列最大资源 -->
<maxResources>4096mb,4vcores</maxResources>
<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
<maxRunningApps>4</maxRunningApps>
<!-- 队列中Application Master占用资源的最大比例,注释掉,不然Yarn启动失败(原因未知) -->
<!-- <maxAMShare>0.5</maxAMShare> -->
<!-- 该队列资源权重,默认值为1.0 -->
<weight>1.0</weight>
<!-- 队列内部的资源分配策略 -->
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<!-- 增加一个队列cm -->
<queue name="cm" type="parent">
<!-- 队列最小资源 -->
<minResources>2048mb,2vcores</minResources>
<!-- 队列最大资源 -->
<maxResources>4096mb,4vcores</maxResources>
<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
<maxRunningApps>4</maxRunningApps>
<!-- 队列中Application Master占用资源的最大比例,注释掉,不然Yarn启动失败(原因未知) -->
<!-- <maxAMShare>0.5</maxAMShare> -->
<!-- 该队列资源权重,默认值为1.0 -->
<weight>1.0</weight>
<!-- 队列内部的资源分配策略 -->
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 -->
<queuePlacementPolicy>
<!-- 提交任务时需要指定队列,如未指定提交队列,则继续匹配下一个规则。false表示:如果指定队列不存在,也不允许自动创建该队列-->
<rule name="specified" create="false"/>
<!-- 提交到root.group.<username>队列。
primaryGroup=false表示root.group不存在不允许自动创建
nestedUserQueue=true表示root.group.<username>队列不存在允许自动创建该队列 -->
<rule name="nestedUserQueue" create="true">
<rule name="primaryGroup" create="false"/>
</rule>
<!-- 最后一个规则必须为reject或者default。Reject表示上面规则都不匹配的话,拒绝提交任务。而default表示把任务提交到default队列 -->
<rule name="reject" />
</queuePlacementPolicy>
</allocations>
测试运行
把上述两个配置分发到所有节点,重启集群并测试
# 此时查看Yarn的Web端,任务被分发到root.test队列
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi -Dmapreduce.job.queuename=root.test 1 1
# 此时查看Yarn的Web端,任务被分发到root.cm队列,因为该队列在配置时指定为了parent
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi 1 1
Yarn的Tool接口
解决问题:
这里有个命令行调用jar包的例子(jar就是之前的WordCount案例打包的)。命令行里带参指定了队列,但是运行失败。因为该参数被当成了jar主入口main方法的参数进行传递了。这个问题可以用Tool接口解决,它可以指定main方法具体传参是命令行的哪些字段。
hadoop jar Hadoop-1.0-SNAPSHOT.jar -Dmapreduce.job.queuename=hive
ltd.cmjava.mapreduce.wordcount.WordCountDriver /txt /output
Tool对象可以看作JavaAPI里对Driver的逻辑封装,以之前的WordCount案例为例,这里只需要修改Driver,并增加一个新的Tool类。Mapper和Reducer不变。
自定义的Driver
package ltd.cmjava.yarn.tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
/**
* @ClassName WordCountDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {
private static Tool tool;
public static void main(String[] args) throws Exception {
// 创建配置文件
Configuration conf = new Configuration();
// 判断是否有tool接口
switch (args[0]) {
case "wordcount":
tool = new WordCountTool();
break;
default:
throw new RuntimeException(" No such tool: " + args[0]);
}
// 用Tool执行程序,指定main方法具体传参是命令行的哪些字段
// Arrays.copyOfRange 将老数组的元素放到新数组里面
int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
System.exit(run);
}
}
自定义的Tool
package ltd.cmjava.yarn.tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
/**
* @ClassName WordCountTool
* @Description TODO
* @Author 懂坚持的男人
* @Date 16:06 2023/1/23
* @Version 1.0
**/
public class WordCountTool implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}
运行测试
打包上传,命令行调用jar测试
hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.tool.WordCountDriver wordcount /txt /output
# 带参命令也能运行
hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.tool.WordCountDriver wordcount -Dmapreduce.job.queuename=hive /txt /output
评论区