MapReduce组件 前置内容 为什么要学习MapReduce 1、MapReduce是人类有史以来第一代分布式计算引擎2、后期的绝大多数的分布式计算引擎都借鉴了MapReduce的思想3、学习了MapReduce可以为以后学习
MapReduce组件
前置内容
为什么要学习MapReduce
1、MapReduce是人类有史以来第一代分布式计算引擎2、后期的绝大多数的分布式计算引擎都借鉴了MapReduce的思想3、学习了MapReduce可以为以后学习其他的分布式计算引擎打好基础4、目前还有很多的框架底层代码就是MapReduce: Hive、Sqoop、Oozie
MapReduce要学习到什么程度?
1、MapReduce的学习是一半理论,一半代码2、指导思想是:重理论,轻代码3、上课的代码要求能看懂,自己敲一遍即可,不要求盲写
MapReduce的概述
分布式计算历代引擎
第一代:MapReduce(MR) 离线分析第二代:Tez 离线分析第三代:Spark 离线分析 + 实时分析第四代:Flink 离线分析 + 实时分析第五代:Doris , kylin ,ClickHouse, ES,
MapReduce的思想
1、MapReduce最基本的思想就是分而治之2、MapReduce有两个阶段,一个Map阶段,负责任务的拆分,一个Reduce阶段负责任务的合并3、MapReduce将一个大的任务进行拆分,拆分成小任务,拆分之后,放在不同的主机上运行,运行之后再将这些结果合并4、MapReduce整个处理过程就是将原始数据转成一个个键值对,然后不断的对这些键值对进行迭代处理,直到得到最理想的键值对位,最后的键值对就是我们想要的结果
入门-WordCount案例
对文件中的单词数量进行统计
//=================================WorCountMapper类==============================package pack01_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 第一步:定义类继承Mapper类 * 四个泛型:K1的类型,V1的类型,K2的类型,V2的类型 */public class WorCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { /** * 第二步:重写map方法,在该方法中,将K1、V1,转为K2和V2 * @param key 表示K1,是每一行的偏移量,是系统自动转换得到 * @param value 表是V1, 是每一行的文本数据 * @param context 表示MapReduce的上下文对象,可以将我们的键值对传送到下一个处理环节 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1:获取K2 //1.1 对V1按照空格进行切割,获取的每个单词就是K2 String[] words = value.toString().split(" "); for (String k2 : words) { //2:获取V2,V2就是固定值1 //3:将K2和V2写入上下文中,送到下一个处理环节 context.write(new Text(k2),new LongWritable(1)); } }} //=================================WordCountReducer类=============================package pack01_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Iterator;/** * 第一步:自定义类,继承Reducer类型 * 泛型:K2类型、V2类型、K3类型、V3类型 */public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { /** * 第二步:重写reduce方法,在该方法中,将新K2,V2,转为K3和V3 * @param key 新K2 * @param values [V2] * @param context 上下文对象 */ @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { //1:获取K3, 新K2就是K3 //2:获取V3,遍历[V2]集合,将所有的值相加 long count = 0; //Iterator<LongWritable> iterator = values.iterator(); //while (iterator.hasNext()){ // long i = iterator.next().get(); // count += i; //} for (LongWritable value : values) { count += value.get(); } //3:将K3和V3写入上下文中 context.write(key,new LongWritable(count)); }} //=================================WordCountDriver类=============================package pack01_wordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * 创建一个Job任务,设置Job任务每一个环节的处理流程,最后将该任务交Yarn执行 */public class WordCountDriver { public static void main(String[] args) throws Exception{ //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job = Job.getInstance(configuration, "wordcount_basic"); //2、设置置作业驱动类 job.setJarByClass(WordCountDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 /* FileInputFormat父类:该类用来决定源数据的读取方式 TextInputFormat(默认子类):一行一行读取 */ //方式1-写法 //FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount")); FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount")); //方式2-写法 /* job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/mapreduce/input/wordcount")); */ //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job.setMapperClass(WorCountMapper.class); job.setMapOutputKeyClass(Text.class); //设置K2类型 job.setMapOutputValueClass(LongWritable.class); //设置V2类型 //5:设置分区、排序,规约、分组(保留) //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); //设置K3类型 job.setOutputValueClass(LongWritable.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 /* FileOutputFormat父类:该类用来决定目标数据的写入方式 TextOutputFormat(默认子类):一行一行写入 */ //方式1-写法 //FileOutputFormat.setOutputPath(job, new Path("hdfs://node1:8020/mapreduce/output/wordcount")); FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount")); //方式2-写法 /* job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://node1:8020/mapreduce/input/wordcount")); */ //8、将设置好的job交给Yarn集群去执行 // 提交作业并等待执行完成 boolean resultFlag = job.waitForCompletion(true); //程序退出 System.exit(resultFlag ? 0 :1); }}
- 测试注意:测试之前一定要准备好数据,目标目录不能存在,否则报错
- 本地测试
直接右键执行, 注意该测试只是模拟,不是真正的Yarn集群执行
1、代码打jar包2、将jar包上传到Linux服务器3、执行以下命令 hadoop jar module3_mapreduce-1.0-SNAPSHOT.jar pack01_wordcount.WordCountDriver
MapReduce的分区
1、为了增加MR数据聚合的并行度,有时候需要增加Reduce的个数2、增加了Reduce之后,就要面临一个问题,哪些键值对规哪个Reduce来聚合,你需要定义出一套规则,这套规则就是分区3、分区就是对每一个K2和V2键值对打标记,标记相同的键值对就会跑到同一个Reduce4、如果你定义分区,系统有默认的分区机制5、MR的默认分区是按照键K2进行分区
#需求:将wordcount案例中的数据按照单词长度进行分区,长度>=5的单词和长度小于5的单词进行分区1、定义类继承Partitioner类2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合3、设置job你的自定义分区类 job.setPartitionerClass(MyPartitioner.class);4、在主类中要设置Reduce的个数为 job.setNumReduceTasks(2);
//=================================CovidMapper类=============================package pack04_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 第一步:定义类继承Mapper类 * 四个泛型:K1的类型,V1的类型,K2的类型,V2的类型 */public class WorCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { /** * 第二步:重写map方法,在该方法中,将K1、V1,转为K2和V2 * @param key 表示K1,是每一行的偏移量,是系统自动转换得到 * @param value 表是V1, 是每一行的文本数据 * @param context 表示MapReduce的上下文对象,可以将我们的键值对传送到下一个处理环节 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1:获取K2 //1.1 对V1按照空格进行切割,获取的每个单词就是K2 String[] words = value.toString().split(" "); for (String k2 : words) { //2:获取V2,V2就是固定值1 //3:将K2和V2写入上下文中,送到下一个处理环节 context.write(new Text(k2),new LongWritable(1)); } }}//=================================CovidPartitioner类=============================package pack04_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/*1、定义类继承Partitioner类2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合3、在主类中要设置Reduce的个数为2 */public class MyPartitioner extends Partitioner<Text, LongWritable> { /** * * @param text K2 * @param longWritable V2 * @param i Reduce的个数 * @return */ @Override public int getPartition(Text text, LongWritable longWritable, int i) { // 长度>=5的单词打标记为0 // 长度小于5的单词打标记为1 if(text.toString().length() >= 5){ return 0; }else { return 1; } }}//=================================CovidReducer类=============================package pack04_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * 第一步:自定义类,继承Reducer类型 * 泛型:K2类型、V2类型、K3类型、V3类型 */public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { /** * 第二步:重写reduce方法,在该方法中,将新K2,V2,转为K3和V3 * @param key 新K2 * @param values [V2] * @param context 上下文对象 */ @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { //1:获取K3, 新K2就是K3 //2:获取V3,遍历[V2]集合,将所有的值相加 long count = 0; //Iterator<LongWritable> iterator = values.iterator(); //while (iterator.hasNext()){ // long i = iterator.next().get(); // count += i; //} for (LongWritable value : values) { count += value.get(); } //3:将K3和V3写入上下文中 context.write(key,new LongWritable(count)); }}//=================================CovidDriver类=============================package pack04_wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/*1、定义类继承Partitioner类2、重写getPartition方法,在该方法中对每一个K2和V2打标记,标记从0开始,0标记的键值对会被0编号的Reduce拉取进行聚合,1标记的键值对会被1编号的Reduce进行聚合3、在主类中要设置Reduce的个数为2 */public class MyPartitioner extends Partitioner<Text, LongWritable> { /** * * @param text K2 * @param longWritable V2 * @param i Reduce的个数 * @return */ @Override public int getPartition(Text text, LongWritable longWritable, int i) { // 长度>=5的单词打标记为0 // 长度小于5的单词打标记为1 if(text.toString().length() >= 5){ return 0; }else { return 1; } }}
public class HashPartitioner<K, V> extends Partitioner<K, V> { public HashPartitioner() { } //根据每一个K2的hash进行分区,分区的效果是:每一个Reduce可以均衡的聚合数据 public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & 2147483647) % numReduceTasks; }}//----------------------------------------------------------------------public class HashPartitioner<K, V> extends Partitioner<K, V> { public HashPartitioner() { } //根据每一个K2的hash进行分区,分区的效果是:每一个Reduce可以均衡的聚合数据 public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode()+随机数 & 2147483647) % numReduceTasks; }}
MapReduce的自定义类案例
根据疫情数据,统计美国每个州的确诊病例数和死亡病例数 时间 县名 州名, 县编码 确诊人数 死亡人数2021-01-28,Autauga,Alabama, 01001, 5554, 69select 州名,sum(确诊人数),sum(死亡人数) from t_covid group by 州名#最后结果Alabama 192898 345Arkansa 25109 875
1、将州名作为K2,将确诊人数 死亡人数作为V22、可以将V2封装成一个Java类,如果一个自定义类出现在MapReduce中,必须保证该类能够被序列化和反序列化 --方式1:实现Writable #应用场景:JavaBean类对象不作为K2,不需要能够被排序 public class CovidBean implements Writable { //实现序列化 @Override public void write(DataOutput out) throws IOException { } //实现反序列化 @Override public void readFields(DataInput in) throws IOException { }} --方式2:实现WritableComparable #应用场景:JavaBean类对象作为K2,需要能够被排序public class CovidBean implements WritableComparable<CovidBean> { //定义类对象排序的比较规则 @Override public int compareTo(CovidBean o) { return 0; } //实现序列化 @Override public void write(DataOutput out) throws IOException { } //实现反序列化 @Override public void readFields(DataInput in) throws IOException { }}
package pack07_covid_bean;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/* 1、在MR中,如果要自定义Java类,如果该类不是K2,则直接实现Writable接口 2、在该接口中重写序列化方法和反序列化方法 */public class CovidBean implements Writable { private int cases; //确诊人数 private int deaths; //死亡人数 public CovidBean(int cases, int deaths) { this.cases = cases; this.deaths = deaths; } public CovidBean() { } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; } //实现序列化:写 @Override public void write(DataOutput out) throws IOException { out.writeInt(cases); out.writeInt(deaths); } //实现反序列化:读 @Override public void readFields(DataInput in) throws IOException { this.cases = in.readInt(); this.deaths = in.readInt(); } @Override public String toString() { return cases + "\t" + deaths ; }}//---------------------------------------package pack07_covid_bean;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* 1、自定义Java类 */public class CovidMapper extends Mapper<LongWritable, Text,Text,CovidBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CovidBean>.Context context) throws IOException, InterruptedException { String[] array = value.toString().split(","); if(array.length != 6){ return; } //1:得到K2 String k2 = array[2]; //2:得到V2 CovidBean v2 = new CovidBean(); v2.setCases(Integer.parseInt(array[4])); v2.setDeaths(Integer.parseInt(array[5])); //3:将K2和V2写入上下文 context.write(new Text(k2),v2); }}//--------------------------------------------package pack07_covid_bean;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/* 1、自定义Java类 */public class CovidReducer extends Reducer<Text,CovidBean,Text,CovidBean> { @Override protected void reduce(Text key, Iterable<CovidBean> values, Reducer<Text, CovidBean, Text, CovidBean>.Context context) throws IOException, InterruptedException { /* K2 [V2] Alabama {Covid(18919,234),Covid(383883,119)} */ //1:得到K3,K2就是K3, //2:得到V3 int casesCount= 0; int deathsCount= 0; for (CovidBean value : values) { casesCount += value.getCases(); //累加确诊病例 deathsCount += value.getDeaths(); //累加死亡病例 } CovidBean covidBean = new CovidBean(); covidBean.setCases(casesCount); covidBean.setDeaths(deathsCount); //3:将K3和V3写入上下文中 context.write(key,covidBean); }}//-------------------------------------------package pack07_covid_bean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import pack05_wordcount.WorCountMapper;import pack05_wordcount.WordCountDriver;import pack05_wordcount.WordCountReducer;import java.net.URI;/* 1、自定义Java类 */public class CovidDriver { public static void main(String[] args) throws Exception { //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job = Job.getInstance(configuration, "covid_bean_demo"); //2、设置置作业驱动类 job.setJarByClass(CovidDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 //方式1-写法 //FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount")); //FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount")); FileInputFormat.addInputPath(job, new Path(args[0])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job.setMapperClass(CovidMapper.class); job.setMapOutputKeyClass(Text.class); //设置K2类型 job.setMapOutputValueClass(CovidBean.class); //设置V2类型 //5:设置分区、排序,规约、分组(保留) //5.1 设置你的定义分区类 //job.setPartitionerClass(MyPartitioner.class); //5.2 设置Reduce个数 //job.setNumReduceTasks(2); //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job.setReducerClass(CovidReducer.class); job.setOutputKeyClass(Text.class); //设置K3类型 job.setOutputValueClass(CovidBean.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType = "file:///"; //String outputPath = "file:///D:\\output\\wordcount"; //String fsType = "hdfs://node1:8020"; //String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount"; String outputPath = args[1]; URI uri = new URI(fsType); FileSystem fileSystem = FileSystem.get(uri, configuration); boolean flag = fileSystem.exists(new Path(outputPath)); if(flag == true){ fileSystem.delete(new Path(outputPath),true); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); //FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount")); //8、将设置好的job交给Yarn集群去执行 // 提交作业并等待执行完成 boolean resultFlag = job.waitForCompletion(true); //程序退出 System.exit(resultFlag ? 0 :1); }}
MapReduce的排序
#数据Alabama 452734 7340Alaska 53524 253Arizona 745976 12861#要求基于以上数据对确诊病例数进行降序排序,如果确诊病例数相同 ,则按照死亡病例数升序排序select * from A order by cases desc , deaths asc;
1、MR的排序只能按照K2排序,哪个字段要参与排序,则哪个字段就应该包含在K2中2、如果你自定义类作为K2,则必须指定排序规则,实现WritableComparable接口,重写compareTo方法,其他的地方不需要再做任何的设置
package pack08_covid_sort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class CovidSortBean implements WritableComparable<CovidSortBean> { private String state; //州名 private int cases; //确诊人数 private int deaths; //死亡人数 public String getState() { return state; } public void setState(String state) { this.state = state; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; } @Override public String toString() { return state + "\t" + cases + "\t"+deaths ; } //定义你的JavaBean对象的排序规则 /* Alabama 452734 7340 Alaska 53524 253 Arizona 745976 12861 基于以上数据对确诊病例数进行降序排序,如果确诊病例数相同 ,则按照死亡病例数升序排序 select * from A order by cases desc , deaths asc; 我 > 他 返回大于0的值 我 < 他 返回小于0的值 我 = 他 返回等于0的值 */ @Override public int compareTo(CovidSortBean o) { int result = this.cases - o.cases; if(result == 0){ return this.deaths - o.deaths; } return result * -1; } //实现序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(state); out.writeInt(cases); out.writeInt(deaths); } //实现反序列化 @Override public void readFields(DataInput in) throws IOException { this.state = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); }}#----------------------------------------package pack08_covid_sort;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidSortMapper extends Mapper<LongWritable, Text,CovidSortBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CovidSortBean, NullWritable>.Context context) throws IOException, InterruptedException { //1:得到K2 String[] array = value.toString().split("\t"); CovidSortBean k2 = new CovidSortBean(); k2.setState(array[0]); k2.setCases(Integer.parseInt(array[1])); k2.setDeaths(Integer.parseInt(array[2])); //2:得到V2,就是NullWritable //3:将K2和V2写入上下文中 context.write(k2,NullWritable.get()); }}#----------------------------------package pack08_covid_sort;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidSortReducer extends Reducer<CovidSortBean, NullWritable,CovidSortBean,NullWritable> { @Override protected void reduce(CovidSortBean key, Iterable<NullWritable> values, Reducer<CovidSortBean, NullWritable, CovidSortBean, NullWritable>.Context context) throws IOException, InterruptedException { //1:得到K3,就是K2 //2:得到V3,就是NullWritable //3:将K3和V3写入上下文中 context.write(key,NullWritable.get()); }}#----------------------------------package pack08_covid_sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class CovidSortDriver { public static void main(String[] args) throws Exception { //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job = Job.getInstance(configuration, "covid_sort_demo"); //2、设置置作业驱动类 job.setJarByClass(CovidSortDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 //方式1-写法 //FileInputFormat.addInputPath(job, new Path("hdfs://node1:8020/mapreduce/input/wordcount")); //FileInputFormat.addInputPath(job, new Path("file:///D:\\input\\wordcount")); FileInputFormat.addInputPath(job, new Path(args[0])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job.setMapperClass(CovidSortMapper.class); job.setMapOutputKeyClass(CovidSortBean.class); //设置K2类型 job.setMapOutputValueClass(NullWritable.class); //设置V2类型 //5:设置分区、排序,规约、分组(保留) //5.1 设置你的定义分区类 //job.setPartitionerClass(MyPartitioner.class); //5.2 设置Reduce个数 //job.setNumReduceTasks(2); //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job.setReducerClass(CovidSortReducer.class); job.setOutputKeyClass(CovidSortBean.class); //设置K3类型 job.setOutputValueClass(NullWritable.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType = "file:///"; //String outputPath = "file:///D:\\output\\wordcount"; //String fsType = "hdfs://node1:8020"; //String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount"; String outputPath = args[1]; URI uri = new URI(fsType); FileSystem fileSystem = FileSystem.get(uri, configuration); boolean flag = fileSystem.exists(new Path(outputPath)); if(flag == true){ fileSystem.delete(new Path(outputPath),true); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); //FileOutputFormat.setOutputPath(job, new Path("file:///D:\\output\\wordcount")); //8、将设置好的job交给Yarn集群去执行 // 提交作业并等待执行完成 boolean resultFlag = job.waitForCompletion(true); //程序退出 System.exit(resultFlag ? 0 :1); }}
MapReduce的串联
当我们在使用MapReduce进行大数据分析时,很多时候使用一个MR并不能完成分析任务,需要使用多个MR进行串联则我们可以使用MR提供的Job控制器来实现多个MR的依赖串联执行
package pack09_mapreduce_series;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import pack07_covid_bean.CovidBean;import pack07_covid_bean.CovidDriver;import pack07_covid_bean.CovidMapper;import pack07_covid_bean.CovidReducer;import pack08_covid_sort.CovidSortBean;import pack08_covid_sort.CovidSortDriver;import pack08_covid_sort.CovidSortMapper;import pack08_covid_sort.CovidSortReducer;import java.net.URI;public class MapReduceSeriesJob { public static void main(String[] args) throws Exception { //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job1 = Job.getInstance(configuration, "covid_bean_demo"); //2、设置置作业驱动类 job1.setJarByClass(CovidDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 FileInputFormat.addInputPath(job1, new Path(args[0])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job1.setMapperClass(CovidMapper.class); job1.setMapOutputKeyClass(Text.class); //设置K2类型 job1.setMapOutputValueClass(CovidBean.class); //设置V2类型 //5:设置分区、排序,规约、分组(保留) //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job1.setReducerClass(CovidReducer.class); job1.setOutputKeyClass(Text.class); //设置K3类型 job1.setOutputValueClass(CovidBean.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType = "file:///"; String outputPath = args[1]; URI uri = new URI(fsType); FileSystem fileSystem = FileSystem.get(uri, configuration); boolean flag = fileSystem.exists(new Path(outputPath)); if(flag == true){ fileSystem.delete(new Path(outputPath),true); } FileOutputFormat.setOutputPath(job1, new Path(outputPath)); //todo 将普通的作用包装成受控作业 ControlledJob cj1 = new ControlledJob(configuration); cj1.setJob(job1); //1:创建Job2任务对象 //configuration.set("参数名字","参数值"); Job job2 = Job.getInstance(configuration, "covid_sort_demo"); //2、设置置作业驱动类 job2.setJarByClass(CovidSortDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 FileInputFormat.addInputPath(job2, new Path(args[1])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job2.setMapperClass(CovidSortMapper.class); job2.setMapOutputKeyClass(CovidSortBean.class); //设置K2类型 job2.setMapOutputValueClass(NullWritable.class); //设置V2类型 //5:设置分区、排序,规约、分组(保留) //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job2.setReducerClass(CovidSortReducer.class); job2.setOutputKeyClass(CovidSortBean.class); //设置K3类型 job2.setOutputValueClass(NullWritable.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType2 = "file:///"; String outputPath2 = args[2]; URI uri2 = new URI(fsType); FileSystem fileSystem2 = FileSystem.get(uri2, configuration); boolean flag2 = fileSystem.exists(new Path(outputPath2)); if(flag2 == true){ fileSystem2.delete(new Path(outputPath2),true); } FileOutputFormat.setOutputPath(job2, new Path(outputPath2)); //todo 将普通的作用包装成受控作业 ControlledJob cj2 = new ControlledJob(configuration); cj2.setJob(job2); //todo 设置作业之间的依赖关系 cj2.addDependingJob(cj1); //todo 创建主控制器 控制上面两个作业 一起提交 JobControl jc = new JobControl("myctrl"); jc.addJob(cj1); jc.addJob(cj2); //使用线程启动JobControl Thread t = new Thread(jc); t.start(); while (true){ if(jc.allFinished()){ System.out.println(jc.getSuccessfulJobList()); jc.stop(); break; } } }}
hadoop jar module3_mapreduce-1.0-SNAPSHOT.jar /mapreduce/input/covid19 /mapreduce/output/covid19_bean /mapreduce/output/covid19_bean_sort#编写Shell脚本#!/bin/bashHADOOP_PATH=/export/server/hadoop-3.3.0/bin/hadoop${HADOOP_PATH} jar /root/mapreduce/module3_mapreduce-1.0-SNAPSHOT.jar /mapreduce/input/covid19 /mapreduce/output/covid19_bean /mapreduce/output/covid19_bean_sort~
MapReduce的规约(Combiner)
1、规约是MapReduce的一种优化手段,可有可无,有了就属于锦上添花,有或者没有,都不会改变最终的结果2、规约并不是所有MapReduce任务都能使用,前提是不能影响最终结果3、规约主要是对每一个Map端的数据做提前的聚合,减少Map端和Reduce端传输的数据量,提交计算效率4、规约可以理解为将Reduce端代码在Map端提前执行5、如果你的规约代码和Reducer代码一致,则规约代码可以不用写,直接使用Reducer代码即可 job.setCombinerClass(WordCountReducer.class);
1、 自定义一个combiner继承Reducer,重写reduce方法,逻辑和Reducer一样2、 在job中设置: job.setCombinerClass(CustomCombiner.class)
MapReduce的分组
1、分组是对Map端传输过来的数据进行去重聚合# K2 V2 hello 1 hello 1 --分组--> hello [1,1,1] --reduce方法--> hello 3 hello 1 world 1 2、分区和分组区别? 分区是决定K2和V2去往哪一个Reduce进行处理 分组是在同一个Reduce内部进行聚合 3、一般默认的分组就能完成分析操作,但是有时候在特定场景下,默认的分组不能满足我们的需求,则需要我们自定义分组
找出美国每个州state的确诊案例数最多的县county是哪一个。该问题也是俗称的TopN问题。select * from t_covid order by cases desc limit 1;找出美国每个州state的确诊案例数最多前三个县county是哪些。该问题也是俗称的TopN问题。select * from t_covid order by cases desc limit 3;
#如何自定义分组1、写类继承 WritableComparator,重写Compare方法。2、job.setGroupingComparatorClass(xxxx.class);
//-----------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class GroupingBean implements WritableComparable<GroupingBean> { private String state; //州 private int cases; //确诊病例数 public String getState() { return state; } public void setState(String state) { this.state = state; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } //定义排序规则 /* 1:按照同一个州的确诊人数进行降序排序 */ @Override public int compareTo(GroupingBean o) { int result = this.state.compareTo(o.state); if(result == 0){ return o.cases - this.cases; } return result; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(state); out.writeInt(cases); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.state = in.readUTF(); this.cases = in.readInt(); }}//-----------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class GroupingMapper extends Mapper<LongWritable, Text,GroupingBean,Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, GroupingBean, Text>.Context context) throws IOException, InterruptedException { //1:获取K2 String[] array = value.toString().split(","); GroupingBean k2 = new GroupingBean(); k2.setState(array[2]); k2.setCases(Integer.parseInt(array[4])); //2:获取V2,就是V1 //3:将K2和V2写入上下文 context.write(k2,value); }}//------------------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;//1:自定义类去继承WritableComparator类public class MyGroupingComparator extends WritableComparator { //2:编写无参构造,将你的自定义类传给父类 /* 参1:表示传给父类的JavaBean类型 参2:表示允许父类通过反射造子类对象 */ public MyGroupingComparator() { super(GroupingBean.class,true); } //3:在方法中指定分组的规则:两个GroupingBean对象只要你们的state(州)是一样的,就应该分到同一组 //这个方法会被自动调用,只要该方法返回0,则两个GroupingBean对象就分到同一组 @Override // GroupingBean GroupingBean public int compare(WritableComparable a, WritableComparable b) { GroupingBean g1 = (GroupingBean) a; GroupingBean g2 = (GroupingBean) b; //如果g1和g2的州state同,则应该return 0,则这两个对象就会被分到同一组 //if(g1.getState().equals(g2.getState())) { // return 0; //}else{ // return 1; //} return g1.getState().compareTo(g2.getState()); }}//------------------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class GroupingReducer extends Reducer<GroupingBean, Text,Text, NullWritable> { @Override protected void reduce(GroupingBean key, Iterable<Text> values, Reducer<GroupingBean, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //1:得到K3 //2:得到V3 //3:将K3和V3写入上下文中 int count = 1; for (Text value : values) { context.write(value,NullWritable.get()); if(++count > 1) { break; } } }}//------------------------------------------package pack11_mapreduce_grouping;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import pack08_covid_sort.CovidSortBean;import pack08_covid_sort.CovidSortDriver;import pack08_covid_sort.CovidSortMapper;import pack08_covid_sort.CovidSortReducer;import pack10_mapreduce_combiner.WorCountMapper;import pack10_mapreduce_combiner.WordCountDriver;import pack10_mapreduce_combiner.WordCountReducer;import java.net.URI;public class GroupingDriver { public static void main(String[] args) throws Exception { //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job = Job.getInstance(configuration, "grouping_demo"); //2、设置置作业驱动类 job.setJarByClass(GroupingDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 FileInputFormat.addInputPath(job, new Path(args[0])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job.setMapperClass(GroupingMapper.class); job.setMapOutputKeyClass(GroupingBean.class); //设置K2类型 job.setMapOutputValueClass(Text.class); //设置V2类型 //5:设置分区、[排序],规约、分组(保留) job.setGroupingComparatorClass(MyGroupingComparator.class); //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job.setReducerClass(GroupingReducer.class); job.setOutputKeyClass(Text.class); //设置K3类型 job.setOutputValueClass(NullWritable.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType = "file:///"; //String outputPath = "file:///D:\\output\\wordcount"; //String fsType = "hdfs://node1:8020"; //String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount"; String outputPath = args[1]; URI uri = new URI(fsType); FileSystem fileSystem = FileSystem.get(uri, configuration); boolean flag = fileSystem.exists(new Path(outputPath)); if(flag == true){ fileSystem.delete(new Path(outputPath),true); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); //8、将设置好的job交给Yarn集群去执行 // 提交作业并等待执行完成 boolean resultFlag = job.waitForCompletion(true); //程序退出 System.exit(resultFlag ? 0 :1); }}
MapReduce的Join操作
Reduce端join
1、Reduce Join是在Reduce完成Join操作2、Reduce端Join,Join的文件在Map阶段K2就是Join字段3、Reduce会存在数据倾斜的风险,如果存在该文件,则可以使用MapJoin来解决4、Reduce端Join的代码必须放在集群运行,不能在本地运行
//------------------------------------package pack12_reduce_join;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1:确定读取的是哪个源数据文件 FileSplit fileSplit = (FileSplit) context.getInputSplit(); //获取文件切片 String fileName = fileSplit.getPath().getName(); //获取源文件的名字 String[] array = value.toString().split("\\|"); //2:处理订单文件 if ("itheima_order_goods.txt".equals(fileName)) { //订单文件 //2.1:获取K2 String k2 = array[1]; //2.2:获取v2 String v2 = "o_"+array[0] + "\t" + array[2]; //2.3:将k2和v2写入上下文中 context.write(new Text(k2), new Text(v2)); } //3:处理商品文件 if ("itheima_goods.txt".equals(fileName)) { //商品文件 //3.1 获取K2 String k2 = array[0]; String v2 = "g_"+array[0] + "\t" + array[2]; //3.2:将k2和v2写入上下文中 context.write(new Text(k2), new Text(v2)); } }}//-------------------------------------------package pack12_reduce_join;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;public class ReduceJoinReducer extends Reducer<Text,Text,Text, NullWritable> { ArrayList<String> orderList = new ArrayList<>(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { orderList.clear(); String goods_value=""; //1: 遍历集合,分别获取订单信息和商品信息 for (Text value : values) { if(value.toString().startsWith("o_")){ //订单信息 orderList.add(value.toString().substring(2)); // o_12020203 } if(value.toString().startsWith("g_")){ //商品信息 goods_value = value.toString().substring(2); } } //2:将订单信息和商品信息进行拼接 for (String order : orderList) { System.out.println(order); context.write(new Text(order+"\t"+goods_value),NullWritable.get()); } }}//---------------------------------------package pack12_reduce_join;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class ReduceJoinDriver { public static void main(String[] args) throws Exception { //1:创建Job任务对象 Configuration configuration = new Configuration(); //configuration.set("参数名字","参数值"); Job job = Job.getInstance(configuration, "reduce_join_demo"); //2、设置置作业驱动类 job.setJarByClass(ReduceJoinDriver.class); //3、设置文件读取输入类的名字和文件的读取路径 FileInputFormat.addInputPath(job, new Path(args[0])); //4:设置你自定义的Mapper类信息、设置K2类型、设置V2类型 job.setMapperClass(ReduceJoinMapper.class); job.setMapOutputKeyClass(Text.class); //设置K2类型 job.setMapOutputValueClass(Text.class); //设置V2类型 //5:设置分区、[排序],规约、分组(保留) //6:设置你自定义的Reducer类信息、设置K3类型、设置V3类型 job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); //设置K3类型 job.setOutputValueClass(NullWritable.class); //设置V3类型 //7、设置文件读取输出类的名字和文件的写入路径 //7.1 如果目标目录存在,则删除 String fsType = "file:///"; //String outputPath = "file:///D:\\output\\wordcount"; //String fsType = "hdfs://node1:8020"; //String outputPath = "hdfs://node1:8020/mapreduce/output/wordcount"; String outputPath = args[1]; URI uri = new URI(fsType); FileSystem fileSystem = FileSystem.get(uri, configuration); boolean flag = fileSystem.exists(new Path(outputPath)); if(flag == true){ fileSystem.delete(new Path(outputPath),true); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); //8、将设置好的job交给Yarn集群去执行 // 提交作业并等待执行完成 boolean resultFlag = job.waitForCompletion(true); //程序退出 System.exit(resultFlag ? 0 :1); }}
Map端Join
1、Map端join就是在Map端将Join操作完成2、Map端join的前提是小表Join大表,小表的大小默认是20M3、Map端Join需要将小表存在在分布式缓存中,然后读取到每一个MapTask的本地内存的Map集合中4、Map端Join一般不会数据倾斜问题,因为Map的数量是由数据量大小自动决定的5、Map端Join代码不需要Reduce
package pack13_map_join;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.util.HashMap;public class MapJoinMapper extends Mapper<LongWritable, Text,Text,NullWritable> { HashMap<String, String> goodsMap = new HashMap<>(); /** * setup方法会在map方法执行之前先执行,而且只会执行一次,主要用来做初始化工作 * @param context * @throws IOException * @throws InterruptedException */ //将小表从分布式缓存中读取,存入Map集合 @Override protected void setup(Context context) throws IOException, InterruptedException { //1:获取分布式缓存中文件的输入流 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt"))); String line = null; while ((line = bufferedReader.readLine()) != null){ String[] array = line.split("\\|"); goodsMap.put(array[0], array[2]); } /* {100101,四川果冻橙6个约180g/个} {100102,鲜丰水果秭归脐橙中华红} */ } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //1:得到K2 String[] array = value.toString().split("\\|"); String k2 = array[1]; String v2 = array[0] + "\t" + array[2]; //2:将K2和Map集合进行Join String mapValue = goodsMap.get(k2); context.write(new Text(v2 + "\t" + mapValue), NullWritable.get()); }}//----------------------------------------package pack13_map_join;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class MapJoinDriver { public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); //1:创建一个Job对象 Job job = Job.getInstance(configuration, "map_join"); //2:对Job进行设置 //2.1 设置当前的主类的名字 job.setJarByClass(MapJoinDriver.class); //2.2 设置数据读取的路径(大表路径) FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/mapreduce/input/map_join/big_file")); //2.3 指定你自定义的Mapper是哪个类及K2和V2的类型 job.setMapperClass(MapJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //2.3 指定你自定义的Reducer是哪个类及K3和V3的类型 //job.setReducerClass(ReduceJoinReducer.class); //job.setOutputKeyClass(Text.class); //job.setOutputValueClass(NullWritable.class); //将小表存入分布式缓存 job.addCacheFile(new URI("hdfs://node1:8020/mapreduce/input/map_join/small_file/itheima_goods.txt")); //2.4 设置数据输出的路径--该目录要求不能存在,否则报错 //Path outputPath = new Path("file:///D:\\output\\wordcount"); Path outputPath = new Path("hdfs://node1:8020/output/map_join"); FileOutputFormat.setOutputPath(job,outputPath); //2.5 设置Shuffle的分组类 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration()); boolean is_exists = fileSystem.exists(outputPath); if(is_exists == true){ //如果目标文件存在,则删除 fileSystem.delete(outputPath,true); } //3:将Job提交为Yarn执行 boolean bl = job.waitForCompletion(true); //4:退出任务进程,释放资源 System.exit(bl ? 0 : 1); }}
MapReduce执行过程
1、MapReduce在运行的过程中,要经过多次的IO操作,数据要多次落硬盘2、后期几乎所有大数据计算框架都是基于内存处理 MR = 文件---》内存 ---》硬盘 --》内存 ---》文件 Spark = 文件---》内存 ---》内存 --》内存 ---》文件