当前位置 : 主页 > 编程语言 > c++ >

通过 mapreduce 求用户在每个 ip 上登录的次数,同时也求 登录次数最多的 topN

来源:互联网 收集:自由互联 发布时间:2021-06-30
GroupTopN.java package com.zhiyou100.sort.topN;import java.io.IOException;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs
GroupTopN.java
package com.zhiyou100.sort.topN;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GroupTopN {

	public static class GroupTopNMap extends Mapper
 
   {
		
		private String[] infos;
		private IntWritable ONE = new IntWritable(1);
		private Text outKey = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Mapper
  
   .Context context) throws IOException, InterruptedException { infos = value.toString().split("\\s"); if (infos != null && infos.length > 0 && infos[1].equals("login")) { outKey.set(infos[0] + "_" + infos[2]); context.write(outKey, ONE); } } } // 自定义 partition (分区) 让key 的第一个字段(用户名)相同的分到同一个 reducer中 public static class GroupTopPartition extends Partitioner
   
     { private String[] infos; @Override public int getPartition(Text key, IntWritable value, int numPartitions) { infos = key.toString().split("_"); return (infos[0].hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static class GroupTopNReduce extends Reducer
    
      { private TreeMap
     
       topN; private Map
      
        ipLoginTimes; private Text outKey = new Text(); private IntWritable outValue = new IntWritable(); // 求用户在每个 ip 上登录的次数,同时也求 topN @Override protected void reduce(Text key, Iterable
       
         values, Reducer
        
         .Context context) throws IOException, InterruptedException { ipLoginTimes = new HashMap
         
          (); topN = new TreeMap
          
           (); for(IntWritable value : values) { if (ipLoginTimes.containsKey(key.toString())) { ipLoginTimes.put(key.toString(), ipLoginTimes.get(key.toString()) + value.get()); }else{ ipLoginTimes.put(key.toString(), value.get()); } } // 放入 topN for(String userIp : ipLoginTimes.keySet()) { if (topN.size() < 3) { topN.put(ipLoginTimes.get(userIp), userIp); }else{ topN.put(ipLoginTimes.get(userIp), userIp); topN.remove(topN.firstKey()); } } // 输出 topN for(int times : topN.descendingKeySet()) { outKey.set(topN.get(times)); outValue.set(times); context.write(outKey, outValue); } } } // 自定义分组 public static class ReduceGroupComparetor extends Text.Comparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { byte[] cb1 = Arrays.copyOfRange(b1, 2, b1.length); byte[] cb2 = Arrays.copyOfRange(b2, 2, b2.length); String str1 = new String(cb1); String str2 = new String(cb2); return str1.split("_")[0].compareTo(str2.split("_")[0]); } // @Override // public int compare(WritableComparable a, WritableComparable b) { // // Text ca = (Text) a; // Text cb = (Text) b; // // return ca.toString().split("_")[0].compareTo(cb.toString().split("_")[0]); // } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(GroupTopN.class); job.setJobName("TopN"); job.setMapperClass(GroupTopNMap.class); job.setReducerClass(GroupTopNReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("/test/user-logs-large.txt"); Path outputPath = new Path("/hhhhhhhhhhhhhhh"); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 把文件内容以 kv 的形式读取出来发送给 map // job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置 partition job.setPartitionerClass(GroupTopPartition.class); // job.setNumReduceTasks(2); // 设置分组比较器 job.setGroupingComparatorClass(ReduceGroupComparetor.class); System.exit(job.waitForCompletion(true) ? 0 :1); } }
          
         
        
       
      
     
    
   
  
 
网友评论