标签搜索

目 录CONTENT

文章目录

MapReduce源码解析和基本操作

陈铭
2023-01-18 / 0 评论 / 2 点赞 / 296 阅读 / 15,635 字 / 正在检测是否收录...

WordCount

这边自行实现一个WordCount任务,和HDFS的Java调用一样,需要win的Hadoop依赖,此处略。但是自行定义的MR程序后面需要打包成Jar,所以pom文件里加上如下依赖:

<!--如果下方maven-assembly-plugin插件报红,可以再加上对应依赖-->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
</dependency>
---
<build>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
        </configuration>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>

定义Mapper

这个就是MapReduce任务中执行MapTask的类。

  • 用户自定义的Mapper要继承自己的父类
  • Mapper的输入数据是KV对的形式(KV的类型可自定义)
  • Mapper中的业务逻辑写在map()方法中
  • Mapper的输出数据是KV对的形式(KV的类型可自定义)
  • map()方法(MapTask进程)对每一个<K,V>调用一次
package ltd.cmjava.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * @ClassName WordCountMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:11 2023/1/18
 * @Version 1.0
 **/
// 泛型中:LongWritable, Text对应的是Maper输入的KV类型,LongWritable是Key,Text是Value
// 而Text, IntWritable是Maper输出的KV类型,前者是Key后者是Value
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) {
        // 获取一行
        String line = value.toString();
        // 按空格切割单词
        String[] words = line.split(" ");

        try {
            // 输出KV,形式为{"word": 1}
            // 所以出现同个单词时,会有多个相同的KV
            for (String word : words) {
                k.set(word);
                context.write(k, v);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

定义Reducer

这个就是MapReduce任务中执行ReduceTask的类。

  • 用户自定义的Reducer要继承自己的父类
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  • 减速机的业务逻辑写在减速器(方法中)
  • ReduceTask进程对每一组相同k的k,v>组调用一次Reduce()方法
package ltd.cmjava.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @ClassName WordCountReducer
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// 类似于WordCountMapper,四个泛型参数依次是输入Key,输入Value,输出Key,输出Value
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)  {
        // 计算单词次数
        // 相同的KV会进入一次reduce
        // 所以values都是同一个key多个value的迭代器
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        try {
            // 输出KV,此时格式就是{"word": 单词个数}
            v.set(sum);
            context.write(key,v);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

定义Driver

Driver可以视作MapReduce任务运行的入口。

package ltd.cmjava.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @ClassName WordCountDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {

    public static void main(String[] args) {
        try {
            // 获取配置信息以及获取job对象
            // job就是对MR任务的封装
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);

            // 设置Driver的类
            job.setJarByClass(WordCountDriver.class);

            // 设置Mapper和Reducer的类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);

            // 设置Mapper输出的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 设置输入和输出路径
            // 命令行会传两个参数
            // hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);

        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

打包并运行

执行maven的package命令,将不带依赖的jar上传至服务器,这选择的是NameNode节点的hadoop100
image

运行jar

# /txt和/output是HDFS的目录
# /txt为输入,里面有一些txt文本,将会被MR计数
# /output是输出结果,可以看到处理后的计数结果
hadoop jar  Hadoop-1.0-SNAPSHOT.jar
 ltd.cmjava.mapreduce.wordcount.WordCountDriver /txt /output

序列化

除了Hadoop内置的基本数据类型,很多时候需要传输JavaBean,作为整个MR任务的Key或者Value,因此需要将JavaBean转为Hadoop的格式,即序列化

自定义序列化对象

  • 必须实现Writable接口
  • 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
  • 重写序列化方法
  • 重写反序列化方法
  • 注意反序列化的顺序和序列化的顺序完全一致
  • 要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
package ltd.cmjava.mapreduce.serializable;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @ClassName FlowBean
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:43 2023/1/18
 * @Version 1.0
 **/
// 继承Writable接口
public class FlowBean implements Writable,Comparable<FlowBean> {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    // 提供无参构造
    public FlowBean() {
    }

    // 提供三个参数的getter和setter方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    // 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    // 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    // 如果需要将自定义的bean放在key中传输
    // 则还需要实现Comparable接口,因为MapReduce中的Shuffle过程要求对key必须能排序。
    @Override
    public int compareTo(FlowBean o) {
        // 倒序排列,从大到小
        return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }


    // 需要重写toString,不然输出的文件里面会打印bean的地址
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

MR目标

统计每一个手机号耗费的总上行流量、总下行流量、总流量

输入数据格式

7 13560436666 120.196.100.99 1116 954 200
分别是:id,手机号码,网络ip,上行流量,下行流量,网络状态码

输出数据格式

13560436666 1116 954 2070
分别是:手机号码,上行流量,下行流量,总流量

定义Mapper

本质上就是根据空格切分一行数据,生成自定义的序列化对象FlowBean

package ltd.cmjava.mapreduce.serializable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * @ClassName FlowMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:43 2023/1/18
 * @Version 1.0
 **/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取一行数据,转成字符串
        String line = value.toString();

        //2 切割数据
        String[] split = line.split("\t");

        //3 抓取我们需要的数据:手机号,上行流量,下行流量
        String phone = split[1];
        String up = split[split.length - 3];
        String down = split[split.length - 2];

        // 封装outK outV
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();

        // 写出outK outV
        context.write(outK, outV);
    }
}

定义Reducer

对MapTask传过来的FlowBean的字段进行统计

package ltd.cmjava.mapreduce.serializable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * @ClassName FlowReducer
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        long totalUp = 0;
        long totalDown = 0;

        // 遍历values,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            totalUp += flowBean.getUpFlow();
            totalDown += flowBean.getDownFlow();
        }

        // 封装outKV
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        // 写出outK outV
        context.write(key,outV);
    }
}

定义Driver

这里因为不准备达成jar包,所以直接定义了个main入口直接在win环境运行了,其实和打成jar包再上传到服务器调用命令运行一样的效果:

package ltd.cmjava.mapreduce.serializable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 设置Driver的类
        job.setJarByClass(FlowDriver.class);

        // 设置Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));

        // 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

MapReduce运行机制

一个MapReduce任务总是经由 Input --> InputFormat --> Mapper --> Shffle --> Reducer --> OutputFormat --> Output
image

其中MapperReducer就是上述两个案例所实现的东西,即分发任务并行计算,到聚合计算结果。

Shffle是Mapper的计算结果分发到Reducer的过程,其中涉及 排序、分区和合并,具体见下文。

InputFormat则是切片和生成KV的过程,输入到MR任务的文件需要先切片,再生成KV,交由MapTask处理

OutputFormat则是Reducer聚合好后,将输出的KV写出的过程,比如写出到HDFS、本地路径、数据库等

MapReducer的任务提交源码

// 以上述序列化这个案例为例,Driver会有一个API:job.waitForCompletion(true),这个就是整个Job提交的入口
// 那么以此为起点开始Debug

1. job.waitForCompletion(true),进入到job对应的Job.java的waitForCompletion方法
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
    // submit()就是提交Job
      submit();
    }
    if (verbose) {
    // 就是开始执行任务
      monitorAndPrintJob();
    } else {
      ...
    }
    // 判断是否成功执行
    return isSuccessful();
  }
  
2.  submit(),提交任务
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    // ensureState是对Job状态的检查,初始状态是DEFINE,不是这个状态在这个方法内会抛异常
    ensureState(JobState.DEFINE);
    // Hadoop1.x、2.x和3.x有很多不同API,该方法主要解决兼容性问题
    setUseNewAPI();
    // 连接环境,如果是本地运行则会连接本地环境,如果是服务器调命令运行jar,则会连接集群环境
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      // 真正提交任务的地方
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    // 连接完毕,Job状态变成了RUNNING
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
   
3. connect(),进入步骤2里面的connect方法看看怎么连接环境
  synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    // 初始的Job,cluster必定为null
    if (cluster == null) {
    // 最重要的是PrivilegedExceptionAction内部重写方法run()
    // run()返回了一个new Cluster(getConfiguration())
    // 连接具体环境则是这个Cluster的new过程
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }
  
4. new Cluster(getConfiguration()
  // new Cluster(getConfiguration()调用了如下有参构造
  // 这个有参构造有调用了下面一个有参构造
  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }
  // 里面有个initialize方法会判断本地环境还是集群环境
  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }
  
5. initialize(jobTrackAddr, conf),判断环境
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {
    ...
    // List<ClientProtocolProvider> providerList提供了两个环境:
    // YarnClientProtocolProvider和LocalClientProtocolProvider
    // 这个for遍历就是确认连接哪个环境的
    for (ClientProtocolProvider provider : providerList) {
       ....
    }
    ...
  }

6. submitter.submitJobInternal(Job.this, cluster),回到步骤2,看看后续的提交任务
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    // 检查路径,经常看到的输出路径已存在,或者路径为null就是这里检查的
    checkSpecs(job);
    ...
    // 创建缓存路径,到时候提交给MapTask的切片、切片信息等文件就存在这里,当然任务完成后这个缓存路径会删除的
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    ...
    // 该方法会拷贝jar包到集群,当然本地环境不需要
    // 因为命令行执行jar,只是在一个节点,而集群其他节点也需要负责MR任务,所以要共享jar到各个节点
    // 如果Debug进这个方法,里面调用了rUploader.uploadResources(job, jobSubmitDir);,这个就是拷贝jar的逻辑
    copyAndConfigureFiles(job, submitJobDir);
    ...
    // 切片的核心逻辑
    int maps = writeSplits(job, submitJobDir);
    ...
    // 向缓存路径写XML配置文件,保存了切片等各类信息
    writeConf(conf, submitJobFile);
    ...
    // 最后提交了任务
    status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      // 返回Job的状态
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
      
7. writeSplits(job, submitJobDir),查看步骤6内部如何切片的
  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
    // 这里用的是3.x的版本,进入到writeNewSplits
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

8. writeNewSplits(job, jobSubmitDir),看看切片逻辑
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    // 反射出一个InputFormat实例,其实就是默认的InputFormat,即TextInputFormat(FileInputFormat的实现类)
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    // 创建切片,lmputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    // 将切片写到上述的缓存路径,所以缓存路径在任务运行过程中也是有切片文件的。
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

InputFormat

MapTask并行度决定机制

数据块: Block是HDFS物理上把数据分成一块一块(默认128m一块数据)。数据块是HDFS存储数据单位。
数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
image

常见的InputFormat

FileInputFormat

FileInputFormat是个抽象类,Hadoop默认使用它的子类TextInputFormat

切片流程

整个的切片核心过程在getSplit()方法中完成,正如上述Job提交源码解析中,InputFormat是一个反射出来的实例,然后这个实例会调用getSplit(),因此这是个核心方法,定义了一个InputFormat的切片逻辑
(1)程序先找到你数据存储的目录。
(2)开始遍历处理目录下的每一个文件
(3)遍历第一个文件,比如word.txt(300M)
  1)获取文件大小fs.sizeOf(word.txt)
  2)计算切片大小
   computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))
   即 blocksize = 128M
  3)默认情况下,切片大小就是文件块大小(blocksize)
  4)开始切,形成第1个切片: word.txt(0-128M);第2个切片word.txt(128-256M);第3个切片word.txt(256-300M)
   每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片
  5)将切片信息写到一个切片规划文件中,后面存入上述的缓存路径
  6)lmputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
(4)提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片文件计算开启MapTask个数(一般一个切片对应一个MapTask)
(5)第一个文件处理完,就遍历下一个文件,所以切片是一个一个文件进行的

切片机制

(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

TextInputFormat

TextInputFormat是FileInputFormat的实现类,Hadoop默认使用它。它按行读取每条记录。

它生成的KV,k是存储该行在整个文件中的起始字节偏移量, LongWritable类型。V是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

CombineTextInputFormat

框架默认的TextInputFormat切片机制是对按文件切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask。这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

而CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

切片机制

以四个小文件为例,最大虚拟内存设置为4M,如下图:
image

生成切片过程包括:虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
  1)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
  2)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
  3)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
  1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
  最终会形成3个切片,大小分别为:
  (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

如何切换InputFormat

正如上述Driver的实现,里面有个job对象,可以设置使用的InputFormat
这里用WordCountDriver举例

package ltd.cmjava.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @ClassName WordCountDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {

    public static void main(String[] args) {
        try {
            // 获取配置信息以及获取job对象
            // job就是对MR任务的封装
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);

            // 设置Driver的类
            job.setJarByClass(WordCountDriver.class);

            // 设置Mapper和Reducer的类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);

            // 设置Mapper输出的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 如果不设置InputFormat,它默认用的是TextInputFormat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
            //虚拟存储切片最大值设置20m
            CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

            // 设置输入和输出路径
            // 命令行会传两个参数
            // hadoop jar  Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);

        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

其他的WordCountMapper和WordCountReducer没变,但此时运行程序就用上了指定的InputFormat

MapReduce工作流程

image

image

  1. 首先输入待处理的文件,如上图所示,这里是一个200M大的txt文件
  2. 进行InputFormat过程,Hadoop会对文件进行切片,正如上述所说的FileInputFormat的子类TextInputFormat会对其进行切片,即第一块0-128M,第二块128-200M。两块切片对应两个MapTask,对切片生成KV并传输至MapTask进行下一步操作
  3. 此处有两个MapTask,就拿一个举例来说(第二个流程一样的)。MapTask获得到KV会进入Mapper方法进行并行计算,即执行实现的map逻辑,处理完的KV会由outputCollector进行收集,发送给环形缓冲区
  4. 环形缓冲区是在内存里,默认100M大小。之所以叫环形,因为它会在内存某个位置开始,向左写入KV的元数据(索引、分区、偏移量等),向右写入KV的真实数据。右边KV数据写了80%空间后,会进行溢写(内存数据持久化到磁盘)。同时环形缓冲区开始从100%空间对应的位置反向写入数据,当然在前80%空间的数据还未溢写完毕时,只能写剩余的20%然后在80%溢写完毕前堵塞。
  5. 环形缓冲区溢写数据时,根据KV元数据确定这些KV真实数据所属分区,以分区为一组,对同分区的数据进行快排,然后再写入磁盘。如图中有两个分区,一次溢写分别对两个分区的KV进行快排,然后写到磁盘,但不是两个文件,是一个文件,只是前半部分属于分区1且数据有序,后半部分属于分区2且数据有序。
  6. 那么Mapper不断输出KV,环形缓冲区不断接受KV且不断溢写,最后磁盘上有多个溢写文件(每个文件有多个分区,单独分区内数据是有序的)。
  7. 这么多溢写文件需要进行Merge合并,毕竟单独一个溢写文件也只是包含两个分区的部分有序数据。所以把所有溢写文件合并为一个大文件(两个分区,单独分区数据有序)。当然这个MapTask在Merge后的文件内,分区1和2的数据也不完整,因为前面有切片的过程,另一个MapTask也有分区1和2的数据。因为是由有序的数据合并的,这里Merge合并的排序选择归并排序。
  8. Merge后可能有很多同样的key组成的KV,Combin就是合并这些相同key的KV。至此Shuffle过程结束了,即分区、快排–>溢写–>Merge–>Combine。后面就准备给Reducer了
  9. Reducer是主动向MapTask的结果拉取数据,即拉取Shuffle后的文件。当然,前文提到了两个分区,那么一个分区会对应一个Reducer。分区1的Reducer就会拉取分区1的数据,当然分区1数据在两个MapTask结果中,都拉过来,此时分区数据是完整的。
  10. 接着一次读取一组KV,即读取一个Key的数据,Value是个迭代器(就像上述的WordCount和序列化案例)。交给Reducer的业务逻辑进行处理。
  11. 然后进入OutputFormat过程,即把reduce处理完的数据写出。而分区2一样的,由另一个Reducer进行相同的步骤。至此,整个流程完毕!!

Shuffle

Shuffle机制

image
整个Shuffle过程和上述基本一样的,就是Combine后还可以有压缩的操作,进一步减少MapTask最终落盘的文件大小。

MapTask的Shuffle源码解析

1. context.write(k, v),常见自定义的Mapper一般都有这一句,里面就开始了Collection进入环形缓冲区了
  // 进入了context的抽象类TaskInputOutputContextImpl
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    // 里面开始进行collect了
    output.write(key, value);
  }
  
2. output.write(key, value),可以看到有收集阶段、而且还带有分区信息。
    public void write(K key, V value) throws IOException, InterruptedException {
      // 如果没有指定自定义的partitioner,形参里的partitioner就是HashPartitioner
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

3. collector.collect(key, value,partitioner.getPartition(key, value, partitions)),开始写环形缓冲区了。当然Mapper会不停处理KV,所以会多次调用context.write(k, v)去写缓冲器。
    public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      ...
      ...
      ...
      try {
        // 这里就是上述说的写环形缓冲区的右侧,即写KV的数据
        int keystart = bufindex;
        // 写入Key
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          // wrapped the key; must make contiguous
          bb.shiftBufferedKey();
          keystart = 0;
        }
        // 写入Value
        // valstart这里说明了Value是接着Key写进内存的,紧挨着储存的
        // KV紧挨着就是往一侧一直写下去,就是右侧嘛
        final int valstart = bufindex;
        valSerializer.serialize(value);
        bb.write(b0, 0, 0);

        int valend = bb.markRecord();

        mapOutputRecordCounter.increment(1);
        mapOutputByteCounter.increment(
            distanceTo(keystart, valend, bufvoid));

        // 写KV的元数据,即往环形缓冲区左侧写
        kvmeta.put(kvindex + PARTITION, partition);
        kvmeta.put(kvindex + KEYSTART, keystart);
        kvmeta.put(kvindex + VALSTART, valstart);
        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
        // advance kvindex
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
      } catch (MapBufferTooSmallException e) {
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
        spillSingleRecord(key, value, partition);
        mapOutputRecordCounter.increment(1);
        return;
      }
    }
    
4. run(mapperContext),缓冲区写满了会溢写,缓冲区没写满但是Mapper的KV都处理完了也会溢写。上述KV一遍又一遍被Mapper执行,然后进行写缓冲器,这都是上层调用的while循环在工作,所以一遍又一遍进入Mapper的map方法(如下代码所示)。map都处理完了,现在就开始清理环境。
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
  
5. runNewMapper(job, splitMetaInfo, umbilical, reporter),步骤4的cleanup方法执行完就又回到上层调用。
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
     ...
     ...
     ...
    try {
      input.initialize(split, mapperContext);
      // 步骤4的while就是这里调用的
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      // 关闭输出流,其实就是溢写缓冲区
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

6. output.close(mapperContext),进来看看
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can't find class ", cnf);
      }
      collector.close();
    }
    
7. collector.flush(),此处开始出现溢写前的分区快排
    public void flush() throws IOException, ClassNotFoundException,
           InterruptedException {
        ...
        ...
        ...
        if (kvindex != kvend) {
          kvend = (kvindex + NMETA) % kvmeta.capacity();
          bufend = bufmark;
          LOG.info("Spilling map output");
          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
                   "; bufvoid = " + bufvoid);
          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
                   "); kvend = " + kvend + "(" + (kvend * 4) +
                   "); length = " + (distanceTo(kvend, kvstart,
                         kvmeta.capacity()) + 1) + "/" + maxRec);
          // 排序并溢写
          sortAndSpill();
        }
      ...
      // 合并溢写文件
      mergeParts();
    }
        
8. sortAndSpill(),看看怎么快排和溢写
    private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
        ...
        // 快排的逻辑就在这
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        // 然后就遍历一个又一个分区,获取环形缓冲区的数据,进行溢写
        for (int i = 0; i < partitions; ++i) {
           ...
        }
        ...
    }
    
9. mergeParts(),回到步骤7,快排溢写完,需要合并溢写的小文件,MapTask最终只会存在一个大文件。即MapTask的缓存路径存在一个file.out(KV数据)和file.out.index(file.out每对KV在文件中的偏移量)
    private void mergeParts() throws IOException, InterruptedException, 
                                     ClassNotFoundException {
      ...
      // 遍历所有溢写文件
      for(int i = 0; i < numSpills; i++) {
        filename[i] = mapOutputFile.getSpillFile(i);
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
      }
      ...
    }
    
至此MapTask的Shuffle部分完成了,待Reducer来拉取合并后的这个大文件

ReduceTask的Shuffle源码解析

1. reduce.run(this.localConf, Job.this),Reducer会拉取MapTask合并后的大文件
  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    // copy对应Reducer拉取数据
    // sort对应Reducer对数据进行排序(毕竟可能从不同的MapTask拉取到同个分区的数据)
    // reduce就是常说的Reducer的具体业务了
    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    ...
    // 这里开始会反射创建OutputFormat的实例
    initialize(job, getJobID(), reporter, useNewApi);
    ...
    // 初始化,为copy和sort做准备
    shuffleConsumerPlugin.init(shuffleContext);
    // copy和sort就发生在这里,当然还有merge,把拉过来的文件合成大文件,一整个发给后续的Reducer
    rIter = shuffleConsumerPlugin.run();
    ...
    // 完成排序阶段
    sortPhase.complete();   
    ...
    // 这里就是开始进入到Reducer的业务逻辑了,就是自定义Reducer重写的reduce方法
    // 这里还判断了些新旧API
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
  }
  
2. initialize(job, getJobID(), reporter, useNewApi),进入看看做了什么
  public void initialize(JobConf job, JobID id, 
                         Reporter reporter,
                         boolean useNewApi) throws IOException, 
                                                   ClassNotFoundException,
                                                   InterruptedException {
    jobContext = new JobContextImpl(job, id, reporter);
    ...
    if (useNewApi) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("using new api for output committer");
      }
      // 如果Driver没有指定,就用默认的,即TextOutputFormat
      outputFormat =
        ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
      committer = outputFormat.getOutputCommitter(taskContext);
    } else {
      committer = conf.getOutputCommitter();
    }
    // 获取Driver指定的输出路径
    Path outputPath = FileOutputFormat.getOutputPath(conf);
    if (outputPath != null) {
      if ((committer instanceof FileOutputCommitter)) {
        FileOutputFormat.setWorkOutputPath(conf, 
          ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
      } else {
        FileOutputFormat.setWorkOutputPath(conf, outputPath);
      }
    }
    ...
  }
  
3. shuffleConsumerPlugin.init(shuffleContext),回到步骤1,看看init
  public void init(ShuffleConsumerPlugin.Context context) {
    this.context = context;

    this.reduceId = context.getReduceId();
    this.jobConf = context.getJobConf();
    this.umbilical = context.getUmbilical();
    this.reporter = context.getReporter();
    this.metrics = ShuffleClientMetrics.create(context.getReduceId(),
        this.jobConf);
    this.copyPhase = context.getCopyPhase();
    this.taskStatus = context.getStatus();
    this.reduceTask = context.getReduceTask();
    this.localMapFiles = context.getLocalMapFiles();
    
    scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
        this, copyPhase, context.getShuffledMapsCounter(),
        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
    // 开始合并拉去过来的文件
    merger = createMergeManager(context);
  }
  
4. createMergeManager(context),看看怎么拉取数据
  protected MergeManager<K, V> createMergeManager(
      ShuffleConsumerPlugin.Context context) {
    // 本质上就是new一个对象并返回
    // context.getLocalFS():一个LocalFileSystem对象,这里是本地运行Driver去Debug的
    return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
        context.getLocalDirAllocator(), reporter, context.getCodec(),
        context.getCombinerClass(), context.getCombineCollector(), 
        context.getSpilledRecordsCounter(),
        context.getReduceCombineInputCounter(),
        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
        context.getMapOutputFile());
  }

5. new MergeManagerImpl(...),看看这个构造函数做了什么
  public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, 
                      FileSystem localFS,
                      LocalDirAllocator localDirAllocator,  
                      Reporter reporter,
                      CompressionCodec codec,
                      Class<? extends Reducer> combinerClass,
                      CombineOutputCollector<K,V> combineCollector,
                      Counters.Counter spilledRecordsCounter,
                      Counters.Counter reduceCombineInputCounter,
                      Counters.Counter mergedMapOutputsCounter,
                      ExceptionReporter exceptionReporter,
                      Progress mergePhase, MapOutputFile mapOutputFile) {
	...
    // 这里就初始化了copy时写到内存和磁盘的相关对象
    // inMemoryMerger获得了InMemoryMerger对象
    // onDiskMerger获得了OnDiskMerger对象
    // 两者都继承自MergeThread,有个merge方法重写
    // 看两者的merge方法,写内存其实就是内部维护了一个set储存KV
    // 而写磁盘,则是获取LocalFileSystem对象,写道磁盘上
    this.inMemoryMerger = createInMemoryMerger();
    this.inMemoryMerger.start();
    
    this.onDiskMerger = new OnDiskMerger(this);
    this.onDiskMerger.start();
    
    this.mergePhase = mergePhase;
  }
  
6. rIter = shuffleConsumerPlugin.run(),再回到步骤1,看看run做了什么
  public RawKeyValueIterator run() throws IOException, InterruptedException {
    ...
    // 创建一个EventFetcher,它回去拉取数据。
    // 这个类是Thread的子类,所以拉取是异步的且主动的拿数据
    // 具体往哪里拿?umbilical封装了systemJobDir、systemJobFile、localJobDir、localJobFile等,就是关闭MapTask输出文件的具体位置信息
    final EventFetcher<K, V> eventFetcher =
        new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
            maxEventsToFetch);
    // 开始拉取
    eventFetcher.start();
    ...
    // 拉去完毕
    eventFetcher.shutDown();
    ...
    // copy阶段完毕,设置状态
    copyPhase.complete();
    // 设置接下来是sort,排序阶段
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);
    ... 
  }
那么sortPhase.complete();是在上层调用进行的,可以看步骤1

7. runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass),返回步骤1,拉取排序完毕后就是正式进行reduce了
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(...) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    ...
    try {
    // 开始reduce
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }
  
8. reducer.run(reducerContext),
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      // 很类似map方法的调用,是个迭代器,因为reduce方法也可能执行多次,因为又多个不同Key的KV
      while (context.nextKey()) {
        // 后面就进入到了自定义的Reducer去执行reduce方法了
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
自此,ReduceTask的Shuffle工作完成了!!

Partition分区

为什么有了切片还要分区?
切片是划分文件,为了给多个MapTask划分各自的数据处理区间。因此切片是为MapTask负责的,处理的是并行任务的数据划分问题。

分区是划分KV,为了给后续不同的Reducer去处理不同的数据。因此,分区是为ReduceTask负责的,处理的是不同类型数据给各自Reducer的业务逻辑处理的问题。

两者不在一个层面上,前者是架构上的数据划分,后者是业务上的数据划分。

默认分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

自定义Partition案例

在上述的序列化案例中,可以增加一个分区类

package ltd.cmjava.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @ClassName ProvincePartitioner
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:11 2023/1/19
 * @Version 1.0
 **/
// 泛型Text, FlowBean就是Mapper输出的KV的类型
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //获取手机号前三位数字
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;

        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}

在Driver里面设置上使用的分区类和ReduceTask数量

package ltd.cmjava.mapreduce.partition;

import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 设置Driver的类
        job.setJarByClass(FlowDriver.class);

        // 设置Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 指定自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);
        // 同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);

        // 设置程序的输入输出路径
        String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable";
        String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable\\result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path("D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable\\result"));

        // 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

其他的Mapper和Reducer不变,此时运行程序,可以看到最终得到的结果有5个,即自定义分区类中定义的5种分区。
image

注意事项:
(1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception
(3)如果ReduceTask的数量=1,则不管Maplask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000
(4)分区号必须从零开始,逐一累加。

排序

Hadoop默认排序是按照字典序排序,不管业务上是否需要都会进行。
常见出现排序的位置:
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

WritableComparable接口

WritableComparable是个接口,常常为了那些需要成为Key的JavaBean去实现。因为Shuffle根据key进行排序,因此,自定义的Bean作为Key时必须实现该接口

自定义WritableComparable案例

这里对上述的序列化案例输出的结果作为输入,进行本次案例

自定义序列化对象

该对象要作为MapTask输出KV中的Key,必须实现WritableComparable接口

package ltd.cmjava.mapreduce.writablecomparable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @ClassName FlowBean
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:43 2023/1/18
 * @Version 1.0
 **/
// 继承WritableComparable接口,这个结构已经继承了Writable, Comparable
public class FlowBean  implements WritableComparable<FlowBean> {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    // 提供无参构造
    public FlowBean() {
    }

    // 提供三个参数的getter和setter方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    // 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    // 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    // 如果需要将自定义的bean放在key中传输
    // 则还需要实现Comparable接口,因为MapReduce中的Shuffle过程要求对key必须能排序。
    @Override
    public int compareTo(FlowBean o) {
        // 倒序排列,从大到小
        // 按照总流量比较,倒序排列
        if(this.sumFlow > o.sumFlow){
            return -1;
        }else if(this.sumFlow < o.sumFlow){
            return 1;
        }else {
            return 0;
        }
    }


    // 需要重写toString,不然输出的文件里面会打印bean的地址
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}
定义Mapper
package ltd.cmjava.mapreduce.writablecomparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @ClassName FlowMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:43 2023/1/18
 * @Version 1.0
 **/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取一行数据
        String line = value.toString();

        // 按照"\t",切割数据
        String[] split = line.split("\t");

        // 封装outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);

        // 写出outK outV
        context.write(outK,outV);
    }
}
定义Reducer
package ltd.cmjava.mapreduce.writablecomparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @ClassName FlowReducer
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //遍历values集合,循环写出,避免总流量相同的情况
        for (Text value : values) {
            //调换KV位置,反向写出
            context.write(value,key);
        }
    }
}
定义分区

这个就和上述分区类案例一样的

package ltd.cmjava.mapreduce.writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @ClassName ProvincePartitioner
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:36 2023/1/19
 * @Version 1.0
 **/
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        //获取手机号前三位
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}
定义Driver
package ltd.cmjava.mapreduce.writablecomparable;

import ltd.cmjava.mapreduce.writablecomparable.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 设置Driver的类
        job.setJarByClass(FlowDriver.class);

        // 设置Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置Map端输出KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);
        // 设置对应的ReduceTask的个数
        job.setNumReduceTasks(5);

        // 设置程序的输入输出路径
        String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\writablecomparable";
        String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\writablecomparable\\result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        // 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
运行结果

因为设置了分区,因此有5个输出文件,和上述分区类案例一样的。
只是每个文件都按照了总流量从大到小排序了(截图略)

Combiner合并

Combiner就是上文进行Combine的组件,常见的有合并相同Key的KV,本质上是压缩数据量。
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combincr组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
  Combiner是在每一个MapTask所在的节点运行;Reducer是接收全局所有Mapper的输出结果
(4) Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

Combiner合并案例

这里对上述的WordCount案例进行改造,仅仅增加了Combiner类,并在Driver指定使用了该类,其他没变

package ltd.cmjava.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * @ClassName WordCountCombiner
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:52 2023/1/19
 * @Version 1.0
 **/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        //封装outKV
        outV.set(sum);

        //写出outKV
        context.write(key,outV);
    }
}

Driver加上:

// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordCountCombiner.class);

运行结果略,这个Combiner只是在Shuffle过程压缩数据,不会改变Reducer最终输出的结果,所以输出结果和WordCount案例的结果一样的

OutputFormat

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。如果没有额外指定,默认使用FileOutputFormat的子类TextOutputFormat,它会把KV写到磁盘上,正如上述的案例看到的。

自定义OutputFormat案例

很多时候自定义OutputFormat的目的是根据业务选择数据写到所需位置,如:Mysql,HBase等。

这边以上述的序列化案例为基础,实现个小小的需求:最终输出两个文件,13开头的手机号在一个文件,其他开头的在另一个文件。

所以这个案例只修改了Driver,新增了OutputFormat类和RecordWriter类

自定义OutputFormat

除了新建一个OutputFormat,里面还用到了一个RecordWriter,所以这个也要新建。而RecordWriter正是OutputFormat的具体逻辑

package ltd.cmjava.mapreduce.outputformat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @ClassName PhoneOutputFormat
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 20:02 2023/1/20
 * @Version 1.0
 **/
public class PhoneOutputFormat extends FileOutputFormat<Text, FlowBean> {
    @Override
    public RecordWriter<Text, FlowBean> getRecordWriter(TaskAttemptContext job) {
        //创建一个自定义的RecordWriter返回
        PhoneRecordWriter phoneRecordWriter = new PhoneRecordWriter(job);
        return phoneRecordWriter;
    }
}

自定义RecordWriter

其中write方法是其具体逻辑编写的地方。close方法根据业务关闭资源,当然这里只需要关闭IO流。

package ltd.cmjava.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @ClassName PhoneRecordWriter
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 20:03 2023/1/20
 * @Version 1.0
 **/
public class PhoneRecordWriter extends RecordWriter<Text, FlowBean> {

    private FSDataOutputStream head_13;
    private FSDataOutputStream head_other;

    public PhoneRecordWriter(TaskAttemptContext job) {
        // 因为Driver会设置outputPath,直接从job拿
        // D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/outputformat/result
        String outputPath = job.getConfiguration().get(FileOutputFormat.OUTDIR).substring(6);

        try {
            //获取文件系统对象
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统对象创建两个输出流对应不同的目录
            head_13 = fs.create(new Path(outputPath+"/head_13.txt"));
            head_other = fs.create(new Path(outputPath+"/head_other.txt"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, FlowBean value) throws IOException, InterruptedException {
        String phone = key.toString();
        //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
        if (phone.startsWith("13")) {
            head_13.writeBytes(phone + "\t" + value + "\n");
        } else {
            head_other.writeBytes(phone + "\t" + value + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) {
        // 自定义Outputformat时,注意在RecordWirter中的close方法必须关闭流资源。
        // 否则输出的文件内容中数据为空。
        IOUtils.closeStream(head_13);
        IOUtils.closeStream(head_other);
    }
}

自定义Driver

指定使用的OutPutFormat

package ltd.cmjava.mapreduce.outputformat;

import ltd.cmjava.mapreduce.outputformat.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 10:44 2023/1/18
 * @Version 1.0
 **/
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 设置Driver的类
        job.setJarByClass(FlowDriver.class);

        // 设置Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(PhoneOutputFormat.class);

        // 设置程序的输入输出路径
        String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\outputformat";
        String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\outputformat\\result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        // 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

测试结果

在对应位置可以看到这两个文件,而不是默认输出的part开头的文件
image

Join

类似于SQL数据库的Join,Hadoop也有对两组KV(来自两个文件)进行Join查询或者计算数据的需求。
这边举例一个需求:
image

Reducer端进行Join

自定义Bean

package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @ClassName TableBean
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:52 2023/1/21
 * @Version 1.0
 **/
// 实现Writable,这个Hadoop序列化要求的
public class TableBean implements Writable {

    private String id; //订单id
    private String pid; //产品id
    private int amount; //产品数量
    private String pname; //产品名称
    private String flag; //判断是order表还是pd表的标志字段

    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }
}

自定义Mapper

package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
/**
 * @ClassName TableMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:53 2023/1/21
 * @Version 1.0
 **/
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {

    private String filename;
    private Text outK = new Text();
    private TableBean outV = new TableBean();

    // setup只会被调用一次,而下方的map方法则会被多次调用
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取对应文件名称
        InputSplit split = context.getInputSplit();
        FileSplit fileSplit = (FileSplit) split;
        filename = fileSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取一行
        String line = value.toString();

        //判断是哪个文件,然后针对文件进行不同的操作
        if(filename.contains("order")){  //订单表的处理
            String[] split = line.split("\t");
            //封装outK
            outK.set(split[1]);
            //封装outV
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
        }else {                             //商品表的处理
            String[] split = line.split("\t");
            //封装outK
            outK.set(split[0]);
            //封装outV
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        //写出KV
        context.write(outK,outV);
    }
}

自定义Reducer

因为Mapper是以id为作为Key,所以Reducer获得的Value(一个迭代器)里面就会有来自两个文件的TableBean

package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

/**
 * @ClassName TableReducer
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:54 2023/1/21
 * @Version 1.0
 **/
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {

        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        for (TableBean value : values) {

            //判断数据来自哪个表
            if("order".equals(value.getFlag())){   //订单表

                //创建一个临时TableBean对象接收value
                TableBean tmpOrderBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tmpOrderBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                //将临时TableBean对象添加到集合orderBeans
                orderBeans.add(tmpOrderBean);
            }else {                                    //商品表
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        //遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出
        for (TableBean orderBean : orderBeans) {

            orderBean.setPname(pdBean.getPname());

            //写出修改后的orderBean对象
            context.write(orderBean,NullWritable.get());
        }
    }
}

自定义Driver

package ltd.cmjava.mapreduce.join.reducejoin;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @ClassName TableDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:54 2023/1/21
 * @Version 1.0
 **/

import java.io.File;
import java.io.IOException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置程序的输入输出路径
        String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\join\\reducerjoin";
        String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\join\\reducerjoin\\result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

Mapper端进行Join

Reducer端进行Join这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

自定义Mapper

本质上就是缓存了一个表的数据,缓存逻辑放在setup方法里(只会被执行一次)

package ltd.cmjava.mapreduce.join.mapperjoin;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


/**
 * @ClassName TableMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 15:53 2023/1/21
 * @Version 1.0
 **/
public class TableMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> pdMap = new HashMap<>();
    private Text text = new Text();

    //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);

        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);

        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行
            //01	小米
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }

        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //读取大表数据
        //1001	01	1
        String[] fields = value.toString().split("\t");

        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);

        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        //写出
        context.write(text, NullWritable.get());
    }
}

自定义Driver

package ltd.cmjava.mapreduce.join.mapperjoin;

import ltd.cmjava.mapreduce.join.reducejoin.TableBean;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        // Map端Join的逻辑不需要Reduce阶段
        // job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/pd_cache.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 设置程序的输入输出路径
        String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/mapperjoin";
        String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/mapperjoin/result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}


使用场景:
Map Join适用于一张表十分小、一张表很大的场景。
优点:
在Reduce端处理过多的表,非常容易产生数据倾斜。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

ETL

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

数据清洗案例

需求
去除日志中字段个数小于等于11的日志。

自定义Mapper

package ltd.cmjava.mapreduce.etl;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @ClassName ETLMapper
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:09 2023/1/22
 * @Version 1.0
 **/
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取1行数据
        String line = value.toString();

        // 解析日志
        boolean result = parseLog(line);

        // 日志不合法退出
        if (!result) {
            return;
        }

        // 日志合法就直接写出
        context.write(value, NullWritable.get());
    }

    // 封装解析日志的方法
    private boolean parseLog(String line) {

        // 截取
        String[] fields = line.split(" ");

        // 日志长度大于11的为合法
        if (fields.length > 11) {
            return true;
        }else {
            return false;
        }
    }
}

自定义Driver

package ltd.cmjava.mapreduce.etl;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.net.URI;

/**
 * @ClassName ETLDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:09 2023/1/22
 * @Version 1.0
 **/
public class ETLDriver {
    public static void main(String[] args) throws Exception {
        // 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 加载jar包
        job.setJarByClass(ETLDriver.class);
        // 关联map
        job.setMapperClass(ETLMapper.class);

        // 设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置reducetask个数为0
        job.setNumReduceTasks(0);

        // 设置程序的输入输出路径
        String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/etl";
        String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/etl/result";
        FileUtil.delete(new File(outPath));

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));
        // 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

MapReduce的压缩

压缩的好处和坏处:
压缩的优点:以减少磁盘IO、减少磁盘存储空间。
压缩的缺点:增加CPU开销。

压缩原则:
(1)运算密集型的Job,少用压缩
(2)IO密集型的Job,多用压缩

MR支持的压缩编码:

压缩格式 Hadoop自带? 算法 文件扩展名 是否可切片 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 是,直接使用 Snappy .snappy 和文本处理一样,不需要修改

压缩性能的比较:

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

压缩方式选择

压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。

Gzip压缩:
优点:压缩率比较高;
缺点:不支持Split;压缩/解压速度一般;

Bzip2压缩:
优点:压缩率高;支持Split;
缺点:压缩/解压速度慢。

Lzo压缩:
优点:压缩/解压速度比较快;支持Split;
缺点:压缩率一般;想支持切片需要额外创建索引。

Snappy压缩:
优点:压缩和解压缩速度快;
缺点:不支持Split;压缩率一般;

Mapper和Reducer输出端采用压缩

这里还是以WordCount案例为例
只要改动Driver,设置上map和reduce的压缩即可,其他不变

package ltd.cmjava.mapreduce.zip;

import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**
 * @ClassName WordCountDriver
 * @Description TODO
 * @Author 懂坚持的男人
 * @Date 0:12 2023/1/18
 * @Version 1.0
 **/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {

    public static void main(String[] args) {
        try {
            // 获取配置信息以及获取job对象
            // job就是对MR任务的封装
            Configuration conf = new Configuration();

            // 开启map端输出压缩
            conf.setBoolean("mapreduce.map.output.compress", true);
            // 设置map端输出压缩方式
            conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
            // conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);

            Job job = Job.getInstance(conf);

            // 设置Driver的类
            job.setJarByClass(WordCountDriver.class);

            // 设置Mapper和Reducer的类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);

            // 设置Mapper输出的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 设置最终输出kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 设置程序的输入输出路径
            String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/zip";
            String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/zip/result";
            FileUtil.delete(new File(outPath));

            FileInputFormat.setInputPaths(job, new Path(inputPath));
            FileOutputFormat.setOutputPath(job, new Path(outPath));

            // 设置reduce端输出压缩开启
            FileOutputFormat.setCompressOutput(job, true);
            // 设置压缩的方式
            FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
            // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);


            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);

        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

测试运行结果,可以看到输出的文件已经是压缩过的,即文件后缀.bz2(Reducer端使用BZip2Codec压缩)

image

2

评论区