实战
1.需求将统计结果按照手机归属地不同省份输出到不同文件中(分区)(1)输入数据
(2)期望输出数据手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
image.pngdata;
1 13736234637 192.198.0.1 www.zouxxyy.com 6233 4352 2002 13736234632 192.198.0.1 www.zouxxyy.com 1237 4353 2003 13636234641 192.198.0.1 www.zouxxyy.com 6436 3353 2004 13936234657 192.198.0.1 www.zouxxyy.com 3233 4352 2005 13936234657 192.198.0.1 www.zouxxyy.com 1235 5353 2006 13836234636 192.198.0.1 www.zouxxyy.com 1233 6352 2007 13936234637 192.198.0.1 www.zouxxyy.com 1234 2353 2008 13736234637 192.198.0.1 www.zouxxyy.com 5233 4353 2009 13636234641 192.198.0.1 www.zouxxyy.com 1233 4351 20010 13436234637 192.198.0.1 www.zouxxyy.com 2234 4353 20011 13736234632 192.198.0.1 www.zouxxyy.com 1233 6353 20012 13836234636 192.198.0.1 www.zouxxyy.com 6253 4353 20013 13736234637 192.198.0.1 www.zouxxyy.com 1243 4355 20014 13636234641 192.198.0.1 www.zouxxyy.com 1238 4353 20015 13736234632 192.198.0.1 www.zouxxyy.com 6256 4454 20016 13636234641 192.198.0.1 www.zouxxyy.com 1233 4353 20017 13936234657 192.198.0.1 www.zouxxyy.com 3263 4653 20018 13836234636 192.198.0.1 www.zouxxyy.com 1233 4353 20019 13836234637 192.198.0.1 www.zouxxyy.com 3233 4753 20020 13736234632 192.198.0.1 www.zouxxyy.com 1224 4353 20021 13636234641 192.198.0.1 www.zouxxyy.com 4273 4353 20022 13936234657 192.198.0.1 www.zouxxyy.com 2233 4323 20023 13836234636 192.198.0.1 www.zouxxyy.com 1234 4353 200package com.atguigu.mapreduce.flowsum;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 获取电话号码的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判断是哪个省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; }}4.在驱动函数中增加自定义数据分区设置和ReduceTask设置package com.atguigu.mapreduce.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/output1","e:/output2"}; // 1 获取配置信息,或者job对象实例 Configuration cOnfiguration= new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowsumDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 8 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner.class); // 9 同时指定相应数量的reduce task job.setNumReduceTasks(5); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}WritableComparable排序
image.pngimage.png排序的分类
image.png自定义排序WritableComparable(1)原理分析bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
@Overridepublic int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow根据上面案例产生的结果再次对总流量进行排序。(1)输入数据原始数据 第一次处理后的数据
(2)期望输出数据13509468723 7335 110349 11768413736230513 2481 24681 2716213956435636 132 1512 164413846544121 264 0 264
image.pngpackage com.atguigu.mapreduce.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable { private long upFlow; private long downFlow; private long sumFlow; // 反序列化时,需要反射调用空参构造函数,所以必须有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } @Override public int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow { FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("t"); // 3 封装对象 String phOneNbr= fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 输出 context.write(bean, v); }} (3)编写Reducer类package com.atguigu.mapreduce.sort;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowCountSortReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { // 循环输出,避免总流量相同情况 for (Text text : values) { context.write(text, key); } }} (4)编写Driver类package com.atguigu.mapreduce.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSortDriver { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/output1","e:/output2"}; // 1 获取配置信息,或者job对象实例 Configuration cOnfiguration= new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCountSortDriver.class); // 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}Combiner合并
image.png自定义Combiner实现步骤(a)自定义一个Combiner继承Reducer,重写Reduce方法
public class WordcountCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 汇总操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 写出 context.write(key, new IntWritable(count)); }}combiner实战
1.需求统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。(1)数据输入
(2)期望输出数据期望:Combine输入数据多,输出时经过合并,输出数据降低
image.png案例实操-方案一1)增加一个WordcountCombiner类继承Reducer
package com.atguigu.mr.combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordcountCombiner extends Reducer{IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1 汇总 int sum = 0; for(IntWritable value :values){ sum += value.get(); } v.set(sum); // 2 写出 context.write(key, v); }}2)在WordcountDriver驱动类中指定Combiner// 指定需要使用combiner,以及用哪个类作为combiner的逻辑job.setCombinerClass(WordcountCombiner.class);
案例实操-方案二1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑job.setCombinerClass(WordcountReducer.class);GroupingComparator分组(辅助排序)
对Reduce阶段的数据根据某一个或几个字段进行分组。分组排序步骤:(1)自定义类继承WritableComparator(2)重写compare()方法
@Overridepublic int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 return result;}(3)创建一个构造将比较对象的类传给父类protected OrderGroupingComparator() { super(OrderBean.class, true);}image.png2.需求分析(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品
image.png(1)定义订单信息OrderBean类package com.atguigu.mapreduce.order;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable { private int order_id; // 订单id号 private double price; // 价格 public OrderBean() { super(); } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 二次排序 @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id o.getPrice() ? -1 : 1; } return result; }}(2)编写OrderSortMapper类package com.atguigu.mapreduce.order;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class OrderMapper extends Mapper { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("t"); // 3 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); }}(3)编写OrderSortGroupingComparator类package com.atguigu.mapreduce.order;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() { @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); }}(5)编写OrderSortDriver类package com.atguigu.mapreduce.order;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OrderDriver { public static void main(String[] args) throws Exception, IOException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/input/inputorder" , "e:/output1"}; // 1 获取配置信息 Configuration cOnf= new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包加载路径 job.setJarByClass(OrderDriver.class); // 3 加载map/reduce类 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8 设置reduce端的分组 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}WritableComparator是一个类 这个类是用于mapreduce编程模型中的比较 排序
mapreduce中有两次排序 一次是 在环形缓冲区域之中进行分区 排序
还有一次是数据在reduce端获取文件之后进行分组
现在我讲的是后面那个
//Define the comparator that controls which keys are grouped together or a single call to Reducer#reducejob.setGroupingComparatorClass(MyComparator.class);上面是我们在定义job时候进行的配置 配置如何进行分组
setGroupingComparatorClass内部的参数是RawComparator
而WritableComparator是实现RawComparator
所以我们直接继承WritableComparator类就可以自己定义一个MyComparator