WordCountTopN.java package com.zhiyou100.sort.topN;import java.io.IOException;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable
package com.zhiyou100.sort.topN; import java.io.IOException; import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountTopN { public static class WordCountTopNMap extends Mapper{ private final IntWritable ONE = new IntWritable(1); private Text outKey = new Text(); private String[] infos; @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { infos = value.toString().split("\\s"); for(String word : infos) { outKey.set(word); context.write(outKey, ONE); } } } public static class WordCountTopNReducer extends Reducer { private int sum; private Text outKey = new Text(); private IntWritable outValue = new IntWritable(); // 开辟内存空间保存 topN // treeMap 是一个排序的 map,按照 key 进行排序 private TreeMap topN = new TreeMap (); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { sum = 0; for(IntWritable value : values) { sum += value.get(); } // 把计算结果放入 topN // 先看 topN 中有没有相同的 key,如果有的话就把 topN 中相同的 key 对应的 value 和单词串一起,如果没有的话就直接放进去 if(topN.size() < 3) { if (topN.get(sum) != null) { topN.put(sum, topN.get(sum) + "------" + key.toString()); }else{ topN.put(sum, key.toString()); } }else{ // 大于等于 N 的话放进去一个然后再删掉一个,始终保持 topN 中有 N 个元素 if (topN.get(sum) != null) { topN.put(sum,topN.get(sum) + "-----" + key.toString()); // 因为有 同 key, 是归并操作,因此没增,也不用删 }else{ topN.put(sum, key.toString()); // 放进去后 treemap 会自动排序,这时候把最后一个再给删除,保证 topN 中只有 N 个 kv 对 topN.remove(topN.lastKey()); // 找最大的前 topN ,删除第一个(最小的) // topN.remove(topN.firstKey()); } } } @Override protected void cleanup(Reducer .Context context) throws IOException, InterruptedException { if (topN != null && !topN.isEmpty()) { // 获取 集合中所有的 key Set keys = topN.keySet(); // Set keyss = topN.descendingKeySet(); for(Integer key : keys) { outKey.set(topN.get(key)); outValue.set(key); context.write(outKey, outValue); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountTopN.class); job.setJobName("词频 topN"); job.setMapperClass(WordCountTopNMap.class); job.setReducerClass(WordCountTopNReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("/reversetext/reverse1.txt"); Path outputPath = new Path("/result/WordCountTopN"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); System.exit(job.waitForCompletion(true) ? 1 : 0); } }