TopN.java package com.huike.action02;import java.io.IOException;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs
package com.huike.action02; import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TopN extends Configured implements Tool { public static final int k = 3; public static class TopNMapper extends Mapper{ private TreeMap map = new TreeMap (); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] parameters = line.split("\\s+"); Integer clicks = Integer.parseInt(parameters[1]); map.put(clicks, value.toString()); if (map.size() > k) { map.remove(map.firstKey()); } } protected void cleanup(Context context) throws IOException, InterruptedException { for (String text : map.values()) { if (text.toString() != null && !text.toString().equals("")) { context.write(NullWritable.get(), new Text(text)); } } } } public static class TopNReducer extends Reducer { private TreeMap map = new TreeMap (); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text item : values) { String value[] = item.toString().split("\t"); Integer clicks = Integer.parseInt(value[1]); map.put(clicks, item.toString()); if (map.size() > k) { map.remove(map.firstKey()); } } for (String text : map.values()) { context.write(NullWritable.get(), new Text(text)); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "TopN"); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setJarByClass(TopN.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] args0 = { "/test/action02/TopN.txt", "/test/action02/output/" }; int res = ToolRunner.run(new Configuration(), new TopN(), args0); System.out.println(res); } }