SecondarySort.java package com.zhiyou100.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritabl
package com.zhiyou100.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondarySort {
// 自定义封装类型,封装二次排序的第一个字段和第二个字段
// 自定义排序规则:第一个字段不同按照第一个字段排序,第一个字段相同按照第二个字段排序
public static class TwoFileds implements WritableComparable
{
// 添加 getter 和 setter 方法
private String firstField;
private int secondFiled;
// 序列化
// 序列化(写出)的顺序要和反序列化(读入)的顺序保持一致
public void write(DataOutput out) throws IOException {
out.writeUTF(firstField);
out.writeInt(secondFiled);
}
// 反序列化
public void readFields(DataInput in) throws IOException {
this.firstField = in.readUTF();
this.secondFiled = in.readInt();
}
// 比较方法
// 先比较第一个字段,第一个字段相同的再用第二个字段的比较结果
public int compareTo(TwoFileds o) {
if (this.firstField.equals(o.firstField)) {
// if (this.secondFiled > o.secondFiled) {
//
// return 1;
// }else if(this.secondFiled < o.secondFiled){
//
// return -1;
// }else{
//
// return 0;
// }
return this.secondFiled - o.secondFiled;
}else{
return this.firstField.compareTo(o.firstField);
}
}
public String getFirstField() {
return firstField;
}
public void setFirstField(String firstField) {
this.firstField = firstField;
}
public int getSecondFiled() {
return secondFiled;
}
public void setSecondFiled(int secondFiled) {
this.secondFiled = secondFiled;
}
}
// 自定义分区,用来将第一个字段相同的 key 值分区到同一个 reducer 节点上
public static class TwoFiledsPartitioner extends Partitioner
{ // 返回值是 int 类型,这个数字是 reducer 的标号 @Override public int getPartition(TwoFileds key, NullWritable value, int numPartitions) { // 去符号取绝对值 int reducerNum = (key.firstField.hashCode()&Integer.MAX_VALUE) % numPartitions; return reducerNum; } } // 定义map public static class SecondarySortMap extends Mapper
{ private final NullWritable oValue = NullWritable.get(); @Override protected void map(Text key, Text value, Mapper
.Context context) throws IOException, InterruptedException { TwoFileds twoFileds = new TwoFileds(); twoFileds.setFirstField(key.toString()); twoFileds.setSecondFiled(Integer.valueOf(value.toString())); context.write(twoFileds, oValue); } } // 定义 reducer public static class SecondarySortReduce extends Reducer
{ private Text oKey = new Text(); private Text oValue = new Text(); @Override protected void reduce(TwoFileds key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { for(NullWritable value : values) { oKey.set(key.firstField); oValue.set(String.valueOf(key.secondFiled)); context.write(oKey, oValue); } oKey.set("-----------"); oValue.set(String.valueOf("-----------")); context.write(oKey, oValue); } } // 定义分组比较器, 让不同的 key 值的第一个字段相同的 kv 调用同一个 reducer 方法 public static class GroupToReducerComparetor extends WritableComparator{ // 构造方法里面要向父类传递比较器要比较的数据类型 public GroupToReducerComparetor() { super(TwoFileds.class, true); } // 重写 compare 方法,自定义排序规则 @Override public int compare(WritableComparable a, WritableComparable b) { TwoFileds ca = (TwoFileds)a; TwoFileds cb = (TwoFileds)b; return ca.getFirstField().compareTo(cb.getFirstField()); } } // 设置并启动 job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setJobName("二次排序"); job.setMapperClass(SecondarySortMap.class); job.setReducerClass(SecondarySortReduce.class); job.setMapOutputKeyClass(TwoFileds.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/test/secondaryorder"); Path outputPath = new Path("/SecondarySort"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 把文件内容以 kv 的形式读取出来发送给 map job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置 partition job.setPartitionerClass(TwoFiledsPartitioner.class); // job.setNumReduceTasks(2); // 设置分组比较器 job.setGroupingComparatorClass(GroupToReducerComparetor.class); System.exit(job.waitForCompletion(true) ? 0 :1); } }
