标签搜索

目 录CONTENT

文章目录

Yarn原理介绍及其基本操作

陈铭
2023-01-23 / 0 评论 / 0 点赞 / 275 阅读 / 4,131 字 / 正在检测是否收录...

Yarn作业提交流程

image

这里以hadoop命令启动一个MR任务为例,如上图所示:

  1. 首先会找ResourceManager去申请一个Appilication,而ResourceManager会返回一个Appilication的资源缓存路径,专门存放启动MR任务的各类信息(切片、任务参数、jar包等)
  2. 各类资源提交完毕后,ResourceManager会创建一个Task,放到一个队列中(先进先出)。这是因为可能有很多节点也都想启动MR任务,所以把Task放到队列中,根据调度策略来正式启动任务
  3. 现在正式开启一个Task,ResourceManager会选出一个NodeManager去创建MRApplicationMaster,当然它也创建在Container中(Container是个封装了CPU、内存等资源的环境,当然是逻辑上的环境)。MRApplicationMaster其实就是对后续整个任务的进度监控和任务分配者,ResourceManager只是管理任务去调度执行,具体监督它不做的。
  4. MRApplicationMaster会去获取MR任务的各类信息(存在Appilication的资源缓存路径),然后找ResourceManager去获取哪些节点可以用来进行MapTask。然后MRApplicationMaster就找ResourceManager分配的NodeManager去创建Container然后发送jar包等信息开始map。
  5. 而这些进行MapTask的NodeManager的进行情况由MRApplicationMaster进行监控。但map阶段完成后,也是由MRApplicationMaster询问ResourceManager去获取哪些节点可以用来进行ReduceTask。就像进行MapTask一样,而MRApplicationMaster也会告知这些ReduceTask从哪里拉取map结束后的文件。
  6. 最后MR阶段都做完了,这些进行MapTask和ReduceTask会向ResourceManager注销自己

除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

Yarn常用命令

查看任务

# 列出所有MR任务
yarn application -list

# 过滤并列出所有MR任务
# <state>可选:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
yarn application -list -appStates <state>

# 关闭MR任务
yarn application -kill <application_id>

查看日志

# 查询某个任务的日志
yarn logs -applicationId <application_id>

# 查询Container的日志,因为一个任务可能有多个容器在运行map和reduce
yarn logs -applicationId <application_id> -containerId <container_id>

查看正在运行的任务

# 列出所有正在运行任务的列表
yarn applicationattempt -list <application_id>

# 查看正在运行任务的状态
yarn applicationattempt -status <applicationattempt_id>

查看容器

# 列出所有容器
yarn container -list <applicationattempt_id>

# 打印Container状态,只有在任务跑的途中才能看到Container的状态,任务执行完容器都被释放了
yarn container -status <container_id>

查看节点状态

# 列出所有节点
yarn node -list -all

更新配置

# 更新队列配置,不需要重启Yarn
yarn rmadmin -refreshQueues

查看队列

打印队列信息
# yarn queue -status <queue_name>

配置多队列的容量调度器

创建一个新的队列Hive

需求

需求1:
default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。
需求2:
配置队列优先级

capacity-scheduler.xml

为配置文件/opt/module/hadoop-3.3.4/etc/hadoopcapacity-scheduler.xml添加如下配置

<!-- 指定多队列,增加hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>

<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>

<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>

<!-- 指定hive队列的资源额定容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>

<!-- 用户最多可以使用队列多少资源,1表示完全使用 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>

<!-- 指定hive队列的资源最大容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value>80</value>
</property>

<!-- 启动hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>

<!-- 哪些用户有权向队列提交作业,*表示所有用户 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>

<!-- 哪些用户有队列的管理员权限(查看/杀死),*表示所有用户 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>

<!-- 哪些用户有权配置提交任务优先级,*表示所有用户 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>

<!-- 任务的超时时间设置:yarn application -appId appId -updateLifetime Timeout -->
<!-- 如果application指定了超时时间,则提交到该队列的application能够指定的最大超时时间不能超过该值。-1就是不使用超时 
-->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>

<!-- 如果application没指定超时时间,则用default-application-lifetime作为默认值。1就是不使用超时 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>

给所有节点分发该配置文件,即之前的三节点集群(hadoop100,hadoop101,hadoop102)

刷新队列配置

yarn rmadmin -refreshQueues

向Hive队列提交任务

命令行

以Hadoop自带WordCount的案例为例

# -D表示运行时改变参数值hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar wordcount -D mapreduce.job.queuename=hive /txt /output

JavaAPI

如果是使用JavaAPI,那么最终的文件一定是打成Jar上传集群运行,而不是在本地运行(上述配置的队列就在集群环境)。

仅仅需要修改一下Driver,这里以《MapReduce源码解析和基本操作》的WordCount案例为例。其他都没变,就是Configuration对象加了个配置。

package ltd.cmjava.yarn.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @ClassName WordCountDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {

    public static void main(String[] args) {
        try {
            // 获取配置信息以及获取job对象
            // job就是对MR任务的封装
            Configuration conf = new Configuration();
            conf.set("mapreduce.job.queuename","hive");
            Job job = Job.getInstance(conf);

            // 设置Driver的类
            job.setJarByClass(WordCountDriver.class);

            // 设置Mapper和Reducer的类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);

            // 设置Mapper输出的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 设置输入和输出路径
            // 命令行会传两个参数
            // hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);

        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

测试运行
然后如之前所述,打包上传,命令行调用jar,就会提交到hive队列中

hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.wordcount.WordCountDriver /txt /output

任务优先级配置

yarn-site.xml

默认情况,Yarn将所有任务的优先级限制为0,若想使用任务的优先级功能,须开放该限制。对/opt/module/hadoop-3.3.4/etc/hadoop/yarn-site.xml增加如下配置,即5种优先级

<property>
    <name>yarn.cluster.max-application-priority</name>
    <value>5</value>
</property>

分发配置并重启集群

模拟资源紧张环境

模拟资源紧张环境,可连续提交以下任务,直到新提交的任务申请不到资源为止

hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi 5 100

提交优先级高的任务

这个任务就会有最高优先级,进行任务抢占

hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi -D mapreduce.job.priority=5 5 100

也可以通过以下命令修改正在执行的任务的优先级

yarn application -appID <application_id> -updatePriority <优先级>

配置公平调度器

需求

创建两个队列,分别是test和cm(以用户所属组命名)。

期望实现以下效果:
若用户提交任务时指定队列,则任务提交到指定队列运行;若未指定队列,test用户提交的任务到root.group.test队列运行,atguigu提交的任务到root.group.atguigu队列运行(注:group为用户所属组)。

配置文件

公平调度器的配置涉及到两个文件,一个是yarn-site.xml,另一个是公平调度器队列分配文件fair-scheduler.xml(文件名可自定义)。
(1)配置文件参考资料:
https://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
(2)任务队列放置规则参考资料:
https://blog.cloudera.com/untangling-apache-hadoop-yarn-part-4-fair-scheduler-queue-basics/

yarn-site.xml

编辑/opt/module/hadoop-3.3.4/etc/hadoop/yarn-site.xml,添加如下配置:

<property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    <description>配置使用公平调度器</description>
</property>

<property>
    <name>yarn.scheduler.fair.allocation.file</name>
    <value>/opt/module/hadoop-3.3.4/etc/hadoop/fair-scheduler.xml</value>
    <description>指明公平调度器队列分配配置文件</description>
</property>

<property>
    <name>yarn.scheduler.fair.preemption</name>
    <value>false</value>
    <description>禁止队列间资源抢占</description>
</property>

fair-scheduler.xml

新建/opt/module/hadoop-3.3.4/etc/hadoop/fair-scheduler.xml

<?xml version="1.0"?>
<allocations>
  <!-- 单个队列中Application Master占用资源的最大比例,取值0-1 ,企业一般配置0.1 -->
  <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
  <!-- 单个队列最大资源的默认值 test cm default -->
  <queueMaxResourcesDefault>2048mb,2vcores</queueMaxResourcesDefault>

  <!-- 增加一个队列test -->
  <queue name="test">
    <!-- 队列最小资源 -->
    <minResources>2048mb,2vcores</minResources>
    <!-- 队列最大资源 -->
    <maxResources>4096mb,4vcores</maxResources>
    <!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
    <maxRunningApps>4</maxRunningApps>
    <!-- 队列中Application Master占用资源的最大比例,注释掉,不然Yarn启动失败(原因未知) -->
    <!-- <maxAMShare>0.5</maxAMShare> -->
    <!-- 该队列资源权重,默认值为1.0 -->
    <weight>1.0</weight>
    <!-- 队列内部的资源分配策略 -->
    <schedulingPolicy>fair</schedulingPolicy>
  </queue>
  <!-- 增加一个队列cm -->
  <queue name="cm" type="parent">
    <!-- 队列最小资源 -->
    <minResources>2048mb,2vcores</minResources>
    <!-- 队列最大资源 -->
    <maxResources>4096mb,4vcores</maxResources>
    <!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
    <maxRunningApps>4</maxRunningApps>
    <!-- 队列中Application Master占用资源的最大比例,注释掉,不然Yarn启动失败(原因未知) -->
    <!-- <maxAMShare>0.5</maxAMShare> -->
    <!-- 该队列资源权重,默认值为1.0 -->
    <weight>1.0</weight>
    <!-- 队列内部的资源分配策略 -->
    <schedulingPolicy>fair</schedulingPolicy>
  </queue>

  <!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 -->
  <queuePlacementPolicy>
    <!-- 提交任务时需要指定队列,如未指定提交队列,则继续匹配下一个规则。false表示:如果指定队列不存在,也不允许自动创建该队列-->
    <rule name="specified" create="false"/>
    <!-- 提交到root.group.<username>队列。
    primaryGroup=false表示root.group不存在不允许自动创建
    nestedUserQueue=true表示root.group.<username>队列不存在允许自动创建该队列 -->
    <rule name="nestedUserQueue" create="true">
        <rule name="primaryGroup" create="false"/>
    </rule>
    <!-- 最后一个规则必须为reject或者default。Reject表示上面规则都不匹配的话,拒绝提交任务。而default表示把任务提交到default队列 -->
    <rule name="reject" />
  </queuePlacementPolicy>
</allocations>

测试运行

把上述两个配置分发到所有节点,重启集群并测试

# 此时查看Yarn的Web端,任务被分发到root.test队列
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi -Dmapreduce.job.queuename=root.test 1 1

# 此时查看Yarn的Web端,任务被分发到root.cm队列,因为该队列在配置时指定为了parent
hadoop jar /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar pi 1 1

Yarn的Tool接口

解决问题:
这里有个命令行调用jar包的例子(jar就是之前的WordCount案例打包的)。命令行里带参指定了队列,但是运行失败。因为该参数被当成了jar主入口main方法的参数进行传递了。这个问题可以用Tool接口解决,它可以指定main方法具体传参是命令行的哪些字段。

hadoop jar  Hadoop-1.0-SNAPSHOT.jar -Dmapreduce.job.queuename=hive
 ltd.cmjava.mapreduce.wordcount.WordCountDriver /txt /output

Tool对象可以看作JavaAPI里对Driver的逻辑封装,以之前的WordCount案例为例,这里只需要修改Driver,并增加一个新的Tool类。Mapper和Reducer不变。

自定义的Driver

package ltd.cmjava.yarn.tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;

/**
 * @ClassName WordCountDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {

    private static Tool tool;

    public static void main(String[] args) throws Exception {
        // 创建配置文件
        Configuration conf = new Configuration();

        // 判断是否有tool接口
        switch (args[0]) {
            case "wordcount":
                tool = new WordCountTool();
                break;
            default:
                throw new RuntimeException(" No such tool: " + args[0]);
        }
        // 用Tool执行程序,指定main方法具体传参是命令行的哪些字段
        // Arrays.copyOfRange 将老数组的元素放到新数组里面
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));

        System.exit(run);
    }
}

自定义的Tool

package ltd.cmjava.yarn.tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

/**
 * @ClassName WordCountTool
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 16:06 2023/1/23
 * @Version 1.0
 **/
public class WordCountTool implements Tool {

    private Configuration conf;

    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(conf);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }
}

运行测试

打包上传,命令行调用jar测试

hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.tool.WordCountDriver wordcount /txt /output

# 带参命令也能运行
hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.yarn.tool.WordCountDriver wordcount -Dmapreduce.job.queuename=hive /txt /output
0

评论区