WordCount
这边自行实现一个WordCount任务,和HDFS的Java调用一样,需要win的Hadoop依赖,此处略。但是自行定义的MR程序后面需要打包成Jar,所以pom文件里加上如下依赖:
<!--如果下方maven-assembly-plugin插件报红,可以再加上对应依赖-->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</dependency>
---
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
定义Mapper
这个就是MapReduce任务中执行MapTask的类。
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入数据是KV对的形式(KV的类型可自定义)
- Mapper中的业务逻辑写在map()方法中
- Mapper的输出数据是KV对的形式(KV的类型可自定义)
- map()方法(MapTask进程)对每一个<K,V>调用一次
package ltd.cmjava.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @ClassName WordCountMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:11 2023/1/18
* @Version 1.0
**/
// 泛型中:LongWritable, Text对应的是Maper输入的KV类型,LongWritable是Key,Text是Value
// 而Text, IntWritable是Maper输出的KV类型,前者是Key后者是Value
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) {
// 获取一行
String line = value.toString();
// 按空格切割单词
String[] words = line.split(" ");
try {
// 输出KV,形式为{"word": 1}
// 所以出现同个单词时,会有多个相同的KV
for (String word : words) {
k.set(word);
context.write(k, v);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
定义Reducer
这个就是MapReduce任务中执行ReduceTask的类。
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
- 减速机的业务逻辑写在减速器(方法中)
- ReduceTask进程对每一组相同k的k,v>组调用一次Reduce()方法
package ltd.cmjava.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @ClassName WordCountReducer
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// 类似于WordCountMapper,四个泛型参数依次是输入Key,输入Value,输出Key,输出Value
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) {
// 计算单词次数
// 相同的KV会进入一次reduce
// 所以values都是同一个key多个value的迭代器
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
try {
// 输出KV,此时格式就是{"word": 单词个数}
v.set(sum);
context.write(key,v);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
定义Driver
Driver可以视作MapReduce任务运行的入口。
package ltd.cmjava.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @ClassName WordCountDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {
public static void main(String[] args) {
try {
// 获取配置信息以及获取job对象
// job就是对MR任务的封装
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出路径
// 命令行会传两个参数
// hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
打包并运行
执行maven的package命令,将不带依赖的jar上传至服务器,这选择的是NameNode节点的hadoop100
运行jar
# /txt和/output是HDFS的目录
# /txt为输入,里面有一些txt文本,将会被MR计数
# /output是输出结果,可以看到处理后的计数结果
hadoop jar Hadoop-1.0-SNAPSHOT.jar
ltd.cmjava.mapreduce.wordcount.WordCountDriver /txt /output
序列化
除了Hadoop内置的基本数据类型,很多时候需要传输JavaBean,作为整个MR任务的Key或者Value,因此需要将JavaBean转为Hadoop的格式,即序列化
自定义序列化对象
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序和序列化的顺序完全一致
- 要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
package ltd.cmjava.mapreduce.serializable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @ClassName FlowBean
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:43 2023/1/18
* @Version 1.0
**/
// 继承Writable接口
public class FlowBean implements Writable,Comparable<FlowBean> {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
// 提供无参构造
public FlowBean() {
}
// 提供三个参数的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
// 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
// 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
// 如果需要将自定义的bean放在key中传输
// 则还需要实现Comparable接口,因为MapReduce中的Shuffle过程要求对key必须能排序。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
// 需要重写toString,不然输出的文件里面会打印bean的地址
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
MR目标
统计每一个手机号耗费的总上行流量、总下行流量、总流量
输入数据格式
7 13560436666 120.196.100.99 1116 954 200
分别是:id,手机号码,网络ip,上行流量,下行流量,网络状态码
输出数据格式
13560436666 1116 954 2070
分别是:手机号码,上行流量,下行流量,总流量
定义Mapper
本质上就是根据空格切分一行数据,生成自定义的序列化对象FlowBean
package ltd.cmjava.mapreduce.serializable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @ClassName FlowMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:43 2023/1/18
* @Version 1.0
**/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据,转成字符串
String line = value.toString();
//2 切割数据
String[] split = line.split("\t");
//3 抓取我们需要的数据:手机号,上行流量,下行流量
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
// 封装outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
// 写出outK outV
context.write(outK, outV);
}
}
定义Reducer
对MapTask传过来的FlowBean的字段进行统计
package ltd.cmjava.mapreduce.serializable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @ClassName FlowReducer
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long totalUp = 0;
long totalDown = 0;
// 遍历values,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
totalUp += flowBean.getUpFlow();
totalDown += flowBean.getDownFlow();
}
// 封装outKV
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
// 写出outK outV
context.write(key,outV);
}
}
定义Driver
这里因为不准备达成jar包,所以直接定义了个main入口直接在win环境运行了,其实和打成jar包再上传到服务器调用命令运行一样的效果:
package ltd.cmjava.mapreduce.serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @ClassName FlowDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(FlowDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));
// 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
MapReduce运行机制
一个MapReduce任务总是经由 Input --> InputFormat --> Mapper --> Shffle --> Reducer --> OutputFormat --> Output
其中Mapper和Reducer就是上述两个案例所实现的东西,即分发任务并行计算,到聚合计算结果。
而Shffle是Mapper的计算结果分发到Reducer的过程,其中涉及 排序、分区和合并,具体见下文。
InputFormat则是切片和生成KV的过程,输入到MR任务的文件需要先切片,再生成KV,交由MapTask处理
OutputFormat则是Reducer聚合好后,将输出的KV写出的过程,比如写出到HDFS、本地路径、数据库等
MapReducer的任务提交源码
// 以上述序列化这个案例为例,Driver会有一个API:job.waitForCompletion(true),这个就是整个Job提交的入口
// 那么以此为起点开始Debug
1. job.waitForCompletion(true),进入到job对应的Job.java的waitForCompletion方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
// submit()就是提交Job
submit();
}
if (verbose) {
// 就是开始执行任务
monitorAndPrintJob();
} else {
...
}
// 判断是否成功执行
return isSuccessful();
}
2. submit(),提交任务
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// ensureState是对Job状态的检查,初始状态是DEFINE,不是这个状态在这个方法内会抛异常
ensureState(JobState.DEFINE);
// Hadoop1.x、2.x和3.x有很多不同API,该方法主要解决兼容性问题
setUseNewAPI();
// 连接环境,如果是本地运行则会连接本地环境,如果是服务器调命令运行jar,则会连接集群环境
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 真正提交任务的地方
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 连接完毕,Job状态变成了RUNNING
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
3. connect(),进入步骤2里面的connect方法看看怎么连接环境
synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
// 初始的Job,cluster必定为null
if (cluster == null) {
// 最重要的是PrivilegedExceptionAction内部重写方法run()
// run()返回了一个new Cluster(getConfiguration())
// 连接具体环境则是这个Cluster的new过程
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
4. new Cluster(getConfiguration()
// new Cluster(getConfiguration()调用了如下有参构造
// 这个有参构造有调用了下面一个有参构造
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
// 里面有个initialize方法会判断本地环境还是集群环境
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
5. initialize(jobTrackAddr, conf),判断环境
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
...
// List<ClientProtocolProvider> providerList提供了两个环境:
// YarnClientProtocolProvider和LocalClientProtocolProvider
// 这个for遍历就是确认连接哪个环境的
for (ClientProtocolProvider provider : providerList) {
....
}
...
}
6. submitter.submitJobInternal(Job.this, cluster),回到步骤2,看看后续的提交任务
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 检查路径,经常看到的输出路径已存在,或者路径为null就是这里检查的
checkSpecs(job);
...
// 创建缓存路径,到时候提交给MapTask的切片、切片信息等文件就存在这里,当然任务完成后这个缓存路径会删除的
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
...
// 该方法会拷贝jar包到集群,当然本地环境不需要
// 因为命令行执行jar,只是在一个节点,而集群其他节点也需要负责MR任务,所以要共享jar到各个节点
// 如果Debug进这个方法,里面调用了rUploader.uploadResources(job, jobSubmitDir);,这个就是拷贝jar的逻辑
copyAndConfigureFiles(job, submitJobDir);
...
// 切片的核心逻辑
int maps = writeSplits(job, submitJobDir);
...
// 向缓存路径写XML配置文件,保存了切片等各类信息
writeConf(conf, submitJobFile);
...
// 最后提交了任务
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
// 返回Job的状态
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
7. writeSplits(job, submitJobDir),查看步骤6内部如何切片的
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
// 这里用的是3.x的版本,进入到writeNewSplits
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
8. writeNewSplits(job, jobSubmitDir),看看切片逻辑
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
// 反射出一个InputFormat实例,其实就是默认的InputFormat,即TextInputFormat(FileInputFormat的实现类)
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
// 创建切片,lmputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
// 将切片写到上述的缓存路径,所以缓存路径在任务运行过程中也是有切片文件的。
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
InputFormat
MapTask并行度决定机制
数据块: Block是HDFS物理上把数据分成一块一块(默认128m一块数据)。数据块是HDFS存储数据单位。
数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
常见的InputFormat
FileInputFormat
FileInputFormat是个抽象类,Hadoop默认使用它的子类TextInputFormat
切片流程
整个的切片核心过程在getSplit()方法中完成,正如上述Job提交源码解析中,InputFormat是一个反射出来的实例,然后这个实例会调用getSplit(),因此这是个核心方法,定义了一个InputFormat的切片逻辑
(1)程序先找到你数据存储的目录。
(2)开始遍历处理目录下的每一个文件
(3)遍历第一个文件,比如word.txt(300M)
1)获取文件大小fs.sizeOf(word.txt)
2)计算切片大小
computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))
即 blocksize = 128M
3)默认情况下,切片大小就是文件块大小(blocksize)
4)开始切,形成第1个切片: word.txt(0-128M);第2个切片word.txt(128-256M);第3个切片word.txt(256-300M)
每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片
5)将切片信息写到一个切片规划文件中,后面存入上述的缓存路径
6)lmputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
(4)提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片文件计算开启MapTask个数(一般一个切片对应一个MapTask)
(5)第一个文件处理完,就遍历下一个文件,所以切片是一个一个文件进行的
切片机制
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
TextInputFormat
TextInputFormat是FileInputFormat的实现类,Hadoop默认使用它。它按行读取每条记录。
它生成的KV,k是存储该行在整个文件中的起始字节偏移量, LongWritable类型。V是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
CombineTextInputFormat
框架默认的TextInputFormat切片机制是对按文件切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask。这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
而CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
切片机制
以四个小文件为例,最大虚拟内存设置为4M,如下图:
生成切片过程包括:虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
1)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
2)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
3)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
如何切换InputFormat
正如上述Driver的实现,里面有个job对象,可以设置使用的InputFormat
这里用WordCountDriver举例
package ltd.cmjava.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @ClassName WordCountDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {
public static void main(String[] args) {
try {
// 获取配置信息以及获取job对象
// job就是对MR任务的封装
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
// 设置输入和输出路径
// 命令行会传两个参数
// hadoop jar Hadoop-1.0-SNAPSHOT.jar ltd.cmjava.mapreduce.wordcount.WordCountDriver /word /output
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
其他的WordCountMapper和WordCountReducer没变,但此时运行程序就用上了指定的InputFormat
MapReduce工作流程
- 首先输入待处理的文件,如上图所示,这里是一个200M大的txt文件
- 进行InputFormat过程,Hadoop会对文件进行切片,正如上述所说的FileInputFormat的子类TextInputFormat会对其进行切片,即第一块0-128M,第二块128-200M。两块切片对应两个MapTask,对切片生成KV并传输至MapTask进行下一步操作
- 此处有两个MapTask,就拿一个举例来说(第二个流程一样的)。MapTask获得到KV会进入Mapper方法进行并行计算,即执行实现的map逻辑,处理完的KV会由outputCollector进行收集,发送给环形缓冲区
- 环形缓冲区是在内存里,默认100M大小。之所以叫环形,因为它会在内存某个位置开始,向左写入KV的元数据(索引、分区、偏移量等),向右写入KV的真实数据。右边KV数据写了80%空间后,会进行溢写(内存数据持久化到磁盘)。同时环形缓冲区开始从100%空间对应的位置反向写入数据,当然在前80%空间的数据还未溢写完毕时,只能写剩余的20%然后在80%溢写完毕前堵塞。
- 环形缓冲区溢写数据时,根据KV元数据确定这些KV真实数据所属分区,以分区为一组,对同分区的数据进行快排,然后再写入磁盘。如图中有两个分区,一次溢写分别对两个分区的KV进行快排,然后写到磁盘,但不是两个文件,是一个文件,只是前半部分属于分区1且数据有序,后半部分属于分区2且数据有序。
- 那么Mapper不断输出KV,环形缓冲区不断接受KV且不断溢写,最后磁盘上有多个溢写文件(每个文件有多个分区,单独分区内数据是有序的)。
- 这么多溢写文件需要进行Merge合并,毕竟单独一个溢写文件也只是包含两个分区的部分有序数据。所以把所有溢写文件合并为一个大文件(两个分区,单独分区数据有序)。当然这个MapTask在Merge后的文件内,分区1和2的数据也不完整,因为前面有切片的过程,另一个MapTask也有分区1和2的数据。因为是由有序的数据合并的,这里Merge合并的排序选择归并排序。
- Merge后可能有很多同样的key组成的KV,Combin就是合并这些相同key的KV。至此Shuffle过程结束了,即分区、快排–>溢写–>Merge–>Combine。后面就准备给Reducer了
- Reducer是主动向MapTask的结果拉取数据,即拉取Shuffle后的文件。当然,前文提到了两个分区,那么一个分区会对应一个Reducer。分区1的Reducer就会拉取分区1的数据,当然分区1数据在两个MapTask结果中,都拉过来,此时分区数据是完整的。
- 接着一次读取一组KV,即读取一个Key的数据,Value是个迭代器(就像上述的WordCount和序列化案例)。交给Reducer的业务逻辑进行处理。
- 然后进入OutputFormat过程,即把reduce处理完的数据写出。而分区2一样的,由另一个Reducer进行相同的步骤。至此,整个流程完毕!!
Shuffle
Shuffle机制
整个Shuffle过程和上述基本一样的,就是Combine后还可以有压缩的操作,进一步减少MapTask最终落盘的文件大小。
MapTask的Shuffle源码解析
1. context.write(k, v),常见自定义的Mapper一般都有这一句,里面就开始了Collection进入环形缓冲区了
// 进入了context的抽象类TaskInputOutputContextImpl
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
// 里面开始进行collect了
output.write(key, value);
}
2. output.write(key, value),可以看到有收集阶段、而且还带有分区信息。
public void write(K key, V value) throws IOException, InterruptedException {
// 如果没有指定自定义的partitioner,形参里的partitioner就是HashPartitioner
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
3. collector.collect(key, value,partitioner.getPartition(key, value, partitions)),开始写环形缓冲区了。当然Mapper会不停处理KV,所以会多次调用context.write(k, v)去写缓冲器。
public synchronized void collect(K key, V value, final int partition
) throws IOException {
...
...
...
try {
// 这里就是上述说的写环形缓冲区的右侧,即写KV的数据
int keystart = bufindex;
// 写入Key
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// 写入Value
// valstart这里说明了Value是接着Key写进内存的,紧挨着储存的
// KV紧挨着就是往一侧一直写下去,就是右侧嘛
final int valstart = bufindex;
valSerializer.serialize(value);
bb.write(b0, 0, 0);
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
// 写KV的元数据,即往环形缓冲区左侧写
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
4. run(mapperContext),缓冲区写满了会溢写,缓冲区没写满但是Mapper的KV都处理完了也会溢写。上述KV一遍又一遍被Mapper执行,然后进行写缓冲器,这都是上层调用的while循环在工作,所以一遍又一遍进入Mapper的map方法(如下代码所示)。map都处理完了,现在就开始清理环境。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
5. runNewMapper(job, splitMetaInfo, umbilical, reporter),步骤4的cleanup方法执行完就又回到上层调用。
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
...
...
...
try {
input.initialize(split, mapperContext);
// 步骤4的while就是这里调用的
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
// 关闭输出流,其实就是溢写缓冲区
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
6. output.close(mapperContext),进来看看
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
7. collector.flush(),此处开始出现溢写前的分区快排
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
...
...
...
if (kvindex != kvend) {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
LOG.info("Spilling map output");
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
"; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
"); kvend = " + kvend + "(" + (kvend * 4) +
"); length = " + (distanceTo(kvend, kvstart,
kvmeta.capacity()) + 1) + "/" + maxRec);
// 排序并溢写
sortAndSpill();
}
...
// 合并溢写文件
mergeParts();
}
8. sortAndSpill(),看看怎么快排和溢写
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
...
// 快排的逻辑就在这
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
// 然后就遍历一个又一个分区,获取环形缓冲区的数据,进行溢写
for (int i = 0; i < partitions; ++i) {
...
}
...
}
9. mergeParts(),回到步骤7,快排溢写完,需要合并溢写的小文件,MapTask最终只会存在一个大文件。即MapTask的缓存路径存在一个file.out(KV数据)和file.out.index(file.out每对KV在文件中的偏移量)
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
...
// 遍历所有溢写文件
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
...
}
至此MapTask的Shuffle部分完成了,待Reducer来拉取合并后的这个大文件
ReduceTask的Shuffle源码解析
1. reduce.run(this.localConf, Job.this),Reducer会拉取MapTask合并后的大文件
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
// copy对应Reducer拉取数据
// sort对应Reducer对数据进行排序(毕竟可能从不同的MapTask拉取到同个分区的数据)
// reduce就是常说的Reducer的具体业务了
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
...
// 这里开始会反射创建OutputFormat的实例
initialize(job, getJobID(), reporter, useNewApi);
...
// 初始化,为copy和sort做准备
shuffleConsumerPlugin.init(shuffleContext);
// copy和sort就发生在这里,当然还有merge,把拉过来的文件合成大文件,一整个发给后续的Reducer
rIter = shuffleConsumerPlugin.run();
...
// 完成排序阶段
sortPhase.complete();
...
// 这里就是开始进入到Reducer的业务逻辑了,就是自定义Reducer重写的reduce方法
// 这里还判断了些新旧API
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
}
2. initialize(job, getJobID(), reporter, useNewApi),进入看看做了什么
public void initialize(JobConf job, JobID id,
Reporter reporter,
boolean useNewApi) throws IOException,
ClassNotFoundException,
InterruptedException {
jobContext = new JobContextImpl(job, id, reporter);
...
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
// 如果Driver没有指定,就用默认的,即TextOutputFormat
outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = conf.getOutputCommitter();
}
// 获取Driver指定的输出路径
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(conf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
} else {
FileOutputFormat.setWorkOutputPath(conf, outputPath);
}
}
...
}
3. shuffleConsumerPlugin.init(shuffleContext),回到步骤1,看看init
public void init(ShuffleConsumerPlugin.Context context) {
this.context = context;
this.reduceId = context.getReduceId();
this.jobConf = context.getJobConf();
this.umbilical = context.getUmbilical();
this.reporter = context.getReporter();
this.metrics = ShuffleClientMetrics.create(context.getReduceId(),
this.jobConf);
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
// 开始合并拉去过来的文件
merger = createMergeManager(context);
}
4. createMergeManager(context),看看怎么拉取数据
protected MergeManager<K, V> createMergeManager(
ShuffleConsumerPlugin.Context context) {
// 本质上就是new一个对象并返回
// context.getLocalFS():一个LocalFileSystem对象,这里是本地运行Driver去Debug的
return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
}
5. new MergeManagerImpl(...),看看这个构造函数做了什么
public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
Reporter reporter,
CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
Progress mergePhase, MapOutputFile mapOutputFile) {
...
// 这里就初始化了copy时写到内存和磁盘的相关对象
// inMemoryMerger获得了InMemoryMerger对象
// onDiskMerger获得了OnDiskMerger对象
// 两者都继承自MergeThread,有个merge方法重写
// 看两者的merge方法,写内存其实就是内部维护了一个set储存KV
// 而写磁盘,则是获取LocalFileSystem对象,写道磁盘上
this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start();
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
this.mergePhase = mergePhase;
}
6. rIter = shuffleConsumerPlugin.run(),再回到步骤1,看看run做了什么
public RawKeyValueIterator run() throws IOException, InterruptedException {
...
// 创建一个EventFetcher,它回去拉取数据。
// 这个类是Thread的子类,所以拉取是异步的且主动的拿数据
// 具体往哪里拿?umbilical封装了systemJobDir、systemJobFile、localJobDir、localJobFile等,就是关闭MapTask输出文件的具体位置信息
final EventFetcher<K, V> eventFetcher =
new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
// 开始拉取
eventFetcher.start();
...
// 拉去完毕
eventFetcher.shutDown();
...
// copy阶段完毕,设置状态
copyPhase.complete();
// 设置接下来是sort,排序阶段
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
...
}
那么sortPhase.complete();是在上层调用进行的,可以看步骤1
7. runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass),返回步骤1,拉取排序完毕后就是正式进行reduce了
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(...) throws IOException,InterruptedException,
ClassNotFoundException {
...
try {
// 开始reduce
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
8. reducer.run(reducerContext),
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
// 很类似map方法的调用,是个迭代器,因为reduce方法也可能执行多次,因为又多个不同Key的KV
while (context.nextKey()) {
// 后面就进入到了自定义的Reducer去执行reduce方法了
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
自此,ReduceTask的Shuffle工作完成了!!
Partition分区
为什么有了切片还要分区?
切片是划分文件,为了给多个MapTask划分各自的数据处理区间。因此切片是为MapTask负责的,处理的是并行任务的数据划分问题。
分区是划分KV,为了给后续不同的Reducer去处理不同的数据。因此,分区是为ReduceTask负责的,处理的是不同类型数据给各自Reducer的业务逻辑处理的问题。
两者不在一个层面上,前者是架构上的数据划分,后者是业务上的数据划分。
默认分区
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partition案例
在上述的序列化案例中,可以增加一个分区类
package ltd.cmjava.mapreduce.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @ClassName ProvincePartitioner
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:11 2023/1/19
* @Version 1.0
**/
// 泛型Text, FlowBean就是Mapper输出的KV的类型
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//获取手机号前三位数字
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定义一个分区号变量partition,根据prePhone设置分区号
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
//最后返回分区号partition
return partition;
}
}
在Driver里面设置上使用的分区类和ReduceTask数量
package ltd.cmjava.mapreduce.partition;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
/**
* @ClassName FlowDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(FlowDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的ReduceTask
job.setNumReduceTasks(5);
// 设置程序的输入输出路径
String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable";
String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable\\result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path("D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\serializable\\result"));
// 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
其他的Mapper和Reducer不变,此时运行程序,可以看到最终得到的结果有5个,即自定义分区类中定义的5种分区。
注意事项:
(1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception
(3)如果ReduceTask的数量=1,则不管Maplask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000
(4)分区号必须从零开始,逐一累加。
排序
Hadoop默认排序是按照字典序排序,不管业务上是否需要都会进行。
常见出现排序的位置:
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
WritableComparable接口
WritableComparable是个接口,常常为了那些需要成为Key的JavaBean去实现。因为Shuffle根据key进行排序,因此,自定义的Bean作为Key时必须实现该接口
自定义WritableComparable案例
这里对上述的序列化案例输出的结果作为输入,进行本次案例
自定义序列化对象
该对象要作为MapTask输出KV中的Key,必须实现WritableComparable接口
package ltd.cmjava.mapreduce.writablecomparable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @ClassName FlowBean
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:43 2023/1/18
* @Version 1.0
**/
// 继承WritableComparable接口,这个结构已经继承了Writable, Comparable
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
// 提供无参构造
public FlowBean() {
}
// 提供三个参数的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
// 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
// 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
// 如果需要将自定义的bean放在key中传输
// 则还需要实现Comparable接口,因为MapReduce中的Shuffle过程要求对key必须能排序。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
// 按照总流量比较,倒序排列
if(this.sumFlow > o.sumFlow){
return -1;
}else if(this.sumFlow < o.sumFlow){
return 1;
}else {
return 0;
}
}
// 需要重写toString,不然输出的文件里面会打印bean的地址
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
定义Mapper
package ltd.cmjava.mapreduce.writablecomparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @ClassName FlowMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:43 2023/1/18
* @Version 1.0
**/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据
String line = value.toString();
// 按照"\t",切割数据
String[] split = line.split("\t");
// 封装outK outV
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);
// 写出outK outV
context.write(outK,outV);
}
}
定义Reducer
package ltd.cmjava.mapreduce.writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @ClassName FlowReducer
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//遍历values集合,循环写出,避免总流量相同的情况
for (Text value : values) {
//调换KV位置,反向写出
context.write(value,key);
}
}
}
定义分区
这个就和上述分区类案例一样的
package ltd.cmjava.mapreduce.writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @ClassName ProvincePartitioner
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:36 2023/1/19
* @Version 1.0
**/
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
//获取手机号前三位
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定义一个分区号变量partition,根据prePhone设置分区号
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
//最后返回分区号partition
return partition;
}
}
定义Driver
package ltd.cmjava.mapreduce.writablecomparable;
import ltd.cmjava.mapreduce.writablecomparable.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
/**
* @ClassName FlowDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(FlowDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Map端输出KV类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);
// 设置程序的输入输出路径
String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\writablecomparable";
String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\writablecomparable\\result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
// 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
运行结果
因为设置了分区,因此有5个输出文件,和上述分区类案例一样的。
只是每个文件都按照了总流量从大到小排序了(截图略)
Combiner合并
Combiner就是上文进行Combine的组件,常见的有合并相同Key的KV,本质上是压缩数据量。
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combincr组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;Reducer是接收全局所有Mapper的输出结果
(4) Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
Combiner合并案例
这里对上述的WordCount案例进行改造,仅仅增加了Combiner类,并在Driver指定使用了该类,其他没变
package ltd.cmjava.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @ClassName WordCountCombiner
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:52 2023/1/19
* @Version 1.0
**/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//封装outKV
outV.set(sum);
//写出outKV
context.write(key,outV);
}
}
Driver加上:
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordCountCombiner.class);
运行结果略,这个Combiner只是在Shuffle过程压缩数据,不会改变Reducer最终输出的结果,所以输出结果和WordCount案例的结果一样的
OutputFormat
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。如果没有额外指定,默认使用FileOutputFormat的子类TextOutputFormat,它会把KV写到磁盘上,正如上述的案例看到的。
自定义OutputFormat案例
很多时候自定义OutputFormat的目的是根据业务选择数据写到所需位置,如:Mysql,HBase等。
这边以上述的序列化案例为基础,实现个小小的需求:最终输出两个文件,13开头的手机号在一个文件,其他开头的在另一个文件。
所以这个案例只修改了Driver,新增了OutputFormat类和RecordWriter类
自定义OutputFormat
除了新建一个OutputFormat,里面还用到了一个RecordWriter,所以这个也要新建。而RecordWriter正是OutputFormat的具体逻辑
package ltd.cmjava.mapreduce.outputformat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @ClassName PhoneOutputFormat
* @Description TODO
* @Author 懂坚持的男人
* @Date 20:02 2023/1/20
* @Version 1.0
**/
public class PhoneOutputFormat extends FileOutputFormat<Text, FlowBean> {
@Override
public RecordWriter<Text, FlowBean> getRecordWriter(TaskAttemptContext job) {
//创建一个自定义的RecordWriter返回
PhoneRecordWriter phoneRecordWriter = new PhoneRecordWriter(job);
return phoneRecordWriter;
}
}
自定义RecordWriter
其中write方法是其具体逻辑编写的地方。close方法根据业务关闭资源,当然这里只需要关闭IO流。
package ltd.cmjava.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @ClassName PhoneRecordWriter
* @Description TODO
* @Author 懂坚持的男人
* @Date 20:03 2023/1/20
* @Version 1.0
**/
public class PhoneRecordWriter extends RecordWriter<Text, FlowBean> {
private FSDataOutputStream head_13;
private FSDataOutputStream head_other;
public PhoneRecordWriter(TaskAttemptContext job) {
// 因为Driver会设置outputPath,直接从job拿
// D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/outputformat/result
String outputPath = job.getConfiguration().get(FileOutputFormat.OUTDIR).substring(6);
try {
//获取文件系统对象
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统对象创建两个输出流对应不同的目录
head_13 = fs.create(new Path(outputPath+"/head_13.txt"));
head_other = fs.create(new Path(outputPath+"/head_other.txt"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, FlowBean value) throws IOException, InterruptedException {
String phone = key.toString();
//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
if (phone.startsWith("13")) {
head_13.writeBytes(phone + "\t" + value + "\n");
} else {
head_other.writeBytes(phone + "\t" + value + "\n");
}
}
@Override
public void close(TaskAttemptContext context) {
// 自定义Outputformat时,注意在RecordWirter中的close方法必须关闭流资源。
// 否则输出的文件内容中数据为空。
IOUtils.closeStream(head_13);
IOUtils.closeStream(head_other);
}
}
自定义Driver
指定使用的OutPutFormat
package ltd.cmjava.mapreduce.outputformat;
import ltd.cmjava.mapreduce.outputformat.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
/**
* @ClassName FlowDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 10:44 2023/1/18
* @Version 1.0
**/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(FlowDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置自定义的outputformat
job.setOutputFormatClass(PhoneOutputFormat.class);
// 设置程序的输入输出路径
String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\outputformat";
String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\outputformat\\result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
// 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
测试结果
在对应位置可以看到这两个文件,而不是默认输出的part开头的文件
Join
类似于SQL数据库的Join,Hadoop也有对两组KV(来自两个文件)进行Join查询或者计算数据的需求。
这边举例一个需求:
Reducer端进行Join
自定义Bean
package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @ClassName TableBean
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:52 2023/1/21
* @Version 1.0
**/
// 实现Writable,这个Hadoop序列化要求的
public class TableBean implements Writable {
private String id; //订单id
private String pid; //产品id
private int amount; //产品数量
private String pname; //产品名称
private String flag; //判断是order表还是pd表的标志字段
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
}
自定义Mapper
package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @ClassName TableMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:53 2023/1/21
* @Version 1.0
**/
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();
// setup只会被调用一次,而下方的map方法则会被多次调用
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取对应文件名称
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//判断是哪个文件,然后针对文件进行不同的操作
if(filename.contains("order")){ //订单表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[1]);
//封装outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
}else { //商品表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[0]);
//封装outV
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
//写出KV
context.write(outK,outV);
}
}
自定义Reducer
因为Mapper是以id为作为Key,所以Reducer获得的Value(一个迭代器)里面就会有来自两个文件的TableBean
package ltd.cmjava.mapreduce.join.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
/**
* @ClassName TableReducer
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:54 2023/1/21
* @Version 1.0
**/
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
ArrayList<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
for (TableBean value : values) {
//判断数据来自哪个表
if("order".equals(value.getFlag())){ //订单表
//创建一个临时TableBean对象接收value
TableBean tmpOrderBean = new TableBean();
try {
BeanUtils.copyProperties(tmpOrderBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//将临时TableBean对象添加到集合orderBeans
orderBeans.add(tmpOrderBean);
}else { //商品表
try {
BeanUtils.copyProperties(pdBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
//写出修改后的orderBean对象
context.write(orderBean,NullWritable.get());
}
}
}
自定义Driver
package ltd.cmjava.mapreduce.join.reducejoin;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @ClassName TableDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:54 2023/1/21
* @Version 1.0
**/
import java.io.File;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 设置程序的输入输出路径
String inputPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\join\\reducerjoin";
String outPath="D:\\Develop\\workspace\\Hadoop\\src\\main\\resources\\mapreduce\\join\\reducerjoin\\result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
Mapper端进行Join
Reducer端进行Join这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
自定义Mapper
本质上就是缓存了一个表的数据,缓存逻辑放在setup方法里(只会被执行一次)
package ltd.cmjava.mapreduce.join.mapperjoin;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName TableMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 15:53 2023/1/21
* @Version 1.0
**/
public class TableMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> pdMap = new HashMap<>();
private Text text = new Text();
//任务开始前将pd数据缓存进pdMap
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过缓存文件得到小表数据pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
//获取文件系统对象,并开流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
//通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
//逐行读取,按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
//切割一行
//01 小米
String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
//关流
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取大表数据
//1001 01 1
String[] fields = value.toString().split("\t");
//通过大表每行数据的pid,去pdMap里面取出pname
String pname = pdMap.get(fields[1]);
//将大表每行数据的pid替换为pname
text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
//写出
context.write(text, NullWritable.get());
}
}
自定义Driver
package ltd.cmjava.mapreduce.join.mapperjoin;
import ltd.cmjava.mapreduce.join.reducejoin.TableBean;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
// Map端Join的逻辑不需要Reduce阶段
// job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/pd_cache.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 设置程序的输入输出路径
String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/mapperjoin";
String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/join/mapperjoin/result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
使用场景:
Map Join适用于一张表十分小、一张表很大的场景。
优点:
在Reduce端处理过多的表,非常容易产生数据倾斜。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
ETL
“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
数据清洗案例
需求
去除日志中字段个数小于等于11的日志。
自定义Mapper
package ltd.cmjava.mapreduce.etl;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @ClassName ETLMapper
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:09 2023/1/22
* @Version 1.0
**/
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取1行数据
String line = value.toString();
// 解析日志
boolean result = parseLog(line);
// 日志不合法退出
if (!result) {
return;
}
// 日志合法就直接写出
context.write(value, NullWritable.get());
}
// 封装解析日志的方法
private boolean parseLog(String line) {
// 截取
String[] fields = line.split(" ");
// 日志长度大于11的为合法
if (fields.length > 11) {
return true;
}else {
return false;
}
}
}
自定义Driver
package ltd.cmjava.mapreduce.etl;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.net.URI;
/**
* @ClassName ETLDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:09 2023/1/22
* @Version 1.0
**/
public class ETLDriver {
public static void main(String[] args) throws Exception {
// 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 加载jar包
job.setJarByClass(ETLDriver.class);
// 关联map
job.setMapperClass(ETLMapper.class);
// 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 设置程序的输入输出路径
String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/etl";
String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/etl/result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
// 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
MapReduce的压缩
压缩的好处和坏处:
压缩的优点:以减少磁盘IO、减少磁盘存储空间。
压缩的缺点:增加CPU开销。
压缩原则:
(1)运算密集型的Job,少用压缩
(2)IO密集型的Job,多用压缩
MR支持的压缩编码:
压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
压缩性能的比较:
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
压缩方式选择
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。
Gzip压缩:
优点:压缩率比较高;
缺点:不支持Split;压缩/解压速度一般;
Bzip2压缩:
优点:压缩率高;支持Split;
缺点:压缩/解压速度慢。
Lzo压缩:
优点:压缩/解压速度比较快;支持Split;
缺点:压缩率一般;想支持切片需要额外创建索引。
Snappy压缩:
优点:压缩和解压缩速度快;
缺点:不支持Split;压缩率一般;
Mapper和Reducer输出端采用压缩
这里还是以WordCount案例为例
只要改动Driver,设置上map和reduce的压缩即可,其他不变
package ltd.cmjava.mapreduce.zip;
import ltd.cmjava.mapreduce.partition.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
/**
* @ClassName WordCountDriver
* @Description TODO
* @Author 懂坚持的男人
* @Date 0:12 2023/1/18
* @Version 1.0
**/
// Driver可以视作MR程序运行的入口
public class WordCountDriver {
public static void main(String[] args) {
try {
// 获取配置信息以及获取job对象
// job就是对MR任务的封装
Configuration conf = new Configuration();
// 开启map端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
Job job = Job.getInstance(conf);
// 设置Driver的类
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置程序的输入输出路径
String inputPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/zip";
String outPath="D:/Develop/workspace/Hadoop/src/main/resources/mapreduce/zip/result";
FileUtil.delete(new File(outPath));
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
测试运行结果,可以看到输出的文件已经是压缩过的,即文件后缀.bz2(Reducer端使用BZip2Codec压缩)
评论区