GroupTopN.java package com.zhiyou100.sort.topN;import java.io.IOException;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs
package com.zhiyou100.sort.topN; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GroupTopN { public static class GroupTopNMap extends Mapper{ private String[] infos; private IntWritable ONE = new IntWritable(1); private Text outKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { infos = value.toString().split("\\s"); if (infos != null && infos.length > 0 && infos[1].equals("login")) { outKey.set(infos[0] + "_" + infos[2]); context.write(outKey, ONE); } } } // 自定义 partition (分区) 让key 的第一个字段(用户名)相同的分到同一个 reducer中 public static class GroupTopPartition extends Partitioner { private String[] infos; @Override public int getPartition(Text key, IntWritable value, int numPartitions) { infos = key.toString().split("_"); return (infos[0].hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static class GroupTopNReduce extends Reducer { private TreeMap topN; private Map ipLoginTimes; private Text outKey = new Text(); private IntWritable outValue = new IntWritable(); // 求用户在每个 ip 上登录的次数,同时也求 topN @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { ipLoginTimes = new HashMap (); topN = new TreeMap (); for(IntWritable value : values) { if (ipLoginTimes.containsKey(key.toString())) { ipLoginTimes.put(key.toString(), ipLoginTimes.get(key.toString()) + value.get()); }else{ ipLoginTimes.put(key.toString(), value.get()); } } // 放入 topN for(String userIp : ipLoginTimes.keySet()) { if (topN.size() < 3) { topN.put(ipLoginTimes.get(userIp), userIp); }else{ topN.put(ipLoginTimes.get(userIp), userIp); topN.remove(topN.firstKey()); } } // 输出 topN for(int times : topN.descendingKeySet()) { outKey.set(topN.get(times)); outValue.set(times); context.write(outKey, outValue); } } } // 自定义分组 public static class ReduceGroupComparetor extends Text.Comparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { byte[] cb1 = Arrays.copyOfRange(b1, 2, b1.length); byte[] cb2 = Arrays.copyOfRange(b2, 2, b2.length); String str1 = new String(cb1); String str2 = new String(cb2); return str1.split("_")[0].compareTo(str2.split("_")[0]); } // @Override // public int compare(WritableComparable a, WritableComparable b) { // // Text ca = (Text) a; // Text cb = (Text) b; // // return ca.toString().split("_")[0].compareTo(cb.toString().split("_")[0]); // } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(GroupTopN.class); job.setJobName("TopN"); job.setMapperClass(GroupTopNMap.class); job.setReducerClass(GroupTopNReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("/test/user-logs-large.txt"); Path outputPath = new Path("/hhhhhhhhhhhhhhh"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 把文件内容以 kv 的形式读取出来发送给 map // job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置 partition job.setPartitionerClass(GroupTopPartition.class); // job.setNumReduceTasks(2); // 设置分组比较器 job.setGroupingComparatorClass(ReduceGroupComparetor.class); System.exit(job.waitForCompletion(true) ? 0 :1); } }