当前位置 : 主页 > 编程语言 > c++ >

MapReduce 的二次排序-----------自定义封装类型,封装二次排序的第一个字段和第二

来源:互联网 收集:自由互联 发布时间:2021-06-30
SecondarySort.java package com.zhiyou100.sort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritabl
SecondarySort.java
package com.zhiyou100.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondarySort {

	// 自定义封装类型,封装二次排序的第一个字段和第二个字段
	// 自定义排序规则:第一个字段不同按照第一个字段排序,第一个字段相同按照第二个字段排序
	
	public static class TwoFileds implements WritableComparable
 
   {

		// 添加 getter 和 setter 方法
		private String firstField;
		
		private int secondFiled;
		
		// 序列化
		// 序列化(写出)的顺序要和反序列化(读入)的顺序保持一致
		public void write(DataOutput out) throws IOException {
			
			out.writeUTF(firstField);
			out.writeInt(secondFiled);
		}

		// 反序列化
		public void readFields(DataInput in) throws IOException {
			
			this.firstField = in.readUTF();
			this.secondFiled = in.readInt();
		}

		// 比较方法
		// 先比较第一个字段,第一个字段相同的再用第二个字段的比较结果
		public int compareTo(TwoFileds o) {

			if (this.firstField.equals(o.firstField)) {
				
//				if (this.secondFiled > o.secondFiled) {
//					
//					return 1;
//				}else if(this.secondFiled < o.secondFiled){
//					
//					return -1;
//				}else{
//					
//					return 0;
//				}
				return this.secondFiled - o.secondFiled;
			}else{
				
				return this.firstField.compareTo(o.firstField);
			}
		}

		public String getFirstField() {
			return firstField;
		}

		public void setFirstField(String firstField) {
			this.firstField = firstField;
		}

		public int getSecondFiled() {
			return secondFiled;
		}

		public void setSecondFiled(int secondFiled) {
			this.secondFiled = secondFiled;
		}
	}
	
	// 自定义分区,用来将第一个字段相同的 key 值分区到同一个  reducer 节点上
	public static class TwoFiledsPartitioner extends Partitioner
  
    { // 返回值是 int 类型,这个数字是 reducer 的标号 @Override public int getPartition(TwoFileds key, NullWritable value, int numPartitions) { // 去符号取绝对值 int reducerNum = (key.firstField.hashCode()&Integer.MAX_VALUE) % numPartitions; return reducerNum; } } // 定义map public static class SecondarySortMap extends Mapper
   
     { private final NullWritable oValue = NullWritable.get(); @Override protected void map(Text key, Text value, Mapper
    
     .Context context) throws IOException, InterruptedException { TwoFileds twoFileds = new TwoFileds(); twoFileds.setFirstField(key.toString()); twoFileds.setSecondFiled(Integer.valueOf(value.toString())); context.write(twoFileds, oValue); } } // 定义 reducer public static class SecondarySortReduce extends Reducer
     
       { private Text oKey = new Text(); private Text oValue = new Text(); @Override protected void reduce(TwoFileds key, Iterable
      
        values, Reducer
       
        .Context context) throws IOException, InterruptedException { for(NullWritable value : values) { oKey.set(key.firstField); oValue.set(String.valueOf(key.secondFiled)); context.write(oKey, oValue); } oKey.set("-----------"); oValue.set(String.valueOf("-----------")); context.write(oKey, oValue); } } // 定义分组比较器, 让不同的 key 值的第一个字段相同的 kv 调用同一个 reducer 方法 public static class GroupToReducerComparetor extends WritableComparator{ // 构造方法里面要向父类传递比较器要比较的数据类型 public GroupToReducerComparetor() { super(TwoFileds.class, true); } // 重写 compare 方法,自定义排序规则 @Override public int compare(WritableComparable a, WritableComparable b) { TwoFileds ca = (TwoFileds)a; TwoFileds cb = (TwoFileds)b; return ca.getFirstField().compareTo(cb.getFirstField()); } } // 设置并启动 job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setJobName("二次排序"); job.setMapperClass(SecondarySortMap.class); job.setReducerClass(SecondarySortReduce.class); job.setMapOutputKeyClass(TwoFileds.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/test/secondaryorder"); Path outputPath = new Path("/SecondarySort"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 把文件内容以 kv 的形式读取出来发送给 map job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置 partition job.setPartitionerClass(TwoFiledsPartitioner.class); // job.setNumReduceTasks(2); // 设置分组比较器 job.setGroupingComparatorClass(GroupToReducerComparetor.class); System.exit(job.waitForCompletion(true) ? 0 :1); } }
       
      
     
    
   
  
 
网友评论