SecondarySort.java package com.zhiyou100.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritabl
package com.zhiyou100.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondarySort { // 自定义封装类型,封装二次排序的第一个字段和第二个字段 // 自定义排序规则:第一个字段不同按照第一个字段排序,第一个字段相同按照第二个字段排序 public static class TwoFileds implements WritableComparable{ // 添加 getter 和 setter 方法 private String firstField; private int secondFiled; // 序列化 // 序列化(写出)的顺序要和反序列化(读入)的顺序保持一致 public void write(DataOutput out) throws IOException { out.writeUTF(firstField); out.writeInt(secondFiled); } // 反序列化 public void readFields(DataInput in) throws IOException { this.firstField = in.readUTF(); this.secondFiled = in.readInt(); } // 比较方法 // 先比较第一个字段,第一个字段相同的再用第二个字段的比较结果 public int compareTo(TwoFileds o) { if (this.firstField.equals(o.firstField)) { // if (this.secondFiled > o.secondFiled) { // // return 1; // }else if(this.secondFiled < o.secondFiled){ // // return -1; // }else{ // // return 0; // } return this.secondFiled - o.secondFiled; }else{ return this.firstField.compareTo(o.firstField); } } public String getFirstField() { return firstField; } public void setFirstField(String firstField) { this.firstField = firstField; } public int getSecondFiled() { return secondFiled; } public void setSecondFiled(int secondFiled) { this.secondFiled = secondFiled; } } // 自定义分区,用来将第一个字段相同的 key 值分区到同一个 reducer 节点上 public static class TwoFiledsPartitioner extends Partitioner { // 返回值是 int 类型,这个数字是 reducer 的标号 @Override public int getPartition(TwoFileds key, NullWritable value, int numPartitions) { // 去符号取绝对值 int reducerNum = (key.firstField.hashCode()&Integer.MAX_VALUE) % numPartitions; return reducerNum; } } // 定义map public static class SecondarySortMap extends Mapper { private final NullWritable oValue = NullWritable.get(); @Override protected void map(Text key, Text value, Mapper .Context context) throws IOException, InterruptedException { TwoFileds twoFileds = new TwoFileds(); twoFileds.setFirstField(key.toString()); twoFileds.setSecondFiled(Integer.valueOf(value.toString())); context.write(twoFileds, oValue); } } // 定义 reducer public static class SecondarySortReduce extends Reducer { private Text oKey = new Text(); private Text oValue = new Text(); @Override protected void reduce(TwoFileds key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { for(NullWritable value : values) { oKey.set(key.firstField); oValue.set(String.valueOf(key.secondFiled)); context.write(oKey, oValue); } oKey.set("-----------"); oValue.set(String.valueOf("-----------")); context.write(oKey, oValue); } } // 定义分组比较器, 让不同的 key 值的第一个字段相同的 kv 调用同一个 reducer 方法 public static class GroupToReducerComparetor extends WritableComparator{ // 构造方法里面要向父类传递比较器要比较的数据类型 public GroupToReducerComparetor() { super(TwoFileds.class, true); } // 重写 compare 方法,自定义排序规则 @Override public int compare(WritableComparable a, WritableComparable b) { TwoFileds ca = (TwoFileds)a; TwoFileds cb = (TwoFileds)b; return ca.getFirstField().compareTo(cb.getFirstField()); } } // 设置并启动 job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setJobName("二次排序"); job.setMapperClass(SecondarySortMap.class); job.setReducerClass(SecondarySortReduce.class); job.setMapOutputKeyClass(TwoFileds.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/test/secondaryorder"); Path outputPath = new Path("/SecondarySort"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 把文件内容以 kv 的形式读取出来发送给 map job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置 partition job.setPartitionerClass(TwoFiledsPartitioner.class); // job.setNumReduceTasks(2); // 设置分组比较器 job.setGroupingComparatorClass(GroupToReducerComparetor.class); System.exit(job.waitForCompletion(true) ? 0 :1); } }