当前位置 : 主页 > 编程语言 > c++ >

MapReduce 全排序

来源:互联网 收集:自由互联 发布时间:2021-06-30
MapReduce.java package com.zhiyou100.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apac
MapReduce.java
package com.zhiyou100.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class TotalSort {
	
	public static class TotalSortMap extends Mapper
 
   {

		private String[] infos;
		
		private IntWritable oKey = new IntWritable();
		
		private Text oValue = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Mapper
  
   .Context context) throws IOException, InterruptedException { infos = value.toString().split("\\s"); oKey.set(Integer.parseInt(infos[1])); oValue.set(infos[0]); context.write(oKey, oValue); } } public static class TotalSortReduce extends Reducer
   
     { @Override protected void reduce(IntWritable key, Iterable
    
      values, Reducer
     
      .Context context) throws IOException, InterruptedException { for(Text value : values) { context.write(value, key); } } } // 重写 key 的comparator 方法 public static class WritableDescComparetor extends IntWritable.Comparator{ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 抽样 InputSampler 中的 key value 是由 inputFormat 传送的,此处传送到 map 就是 sampler 的数据key 必须是我们想要抽样的key InputSampler.Sampler
      
        sampler = new InputSampler.RandomSampler
       
        (0.6, 5); // 设置分区文件(分区中间值的内容) FileSystem hdFileSystem = FileSystem.get(conf); // 文件路径 Path partitionFile = new Path("/partition"); // 设置后,全排序的 partitioner 程序就会读取这个分区文件来完成按照顺序进行分区 TotalOrderPartitioner.setPartitionFile(conf, partitionFile); // 设置 Job Job job = Job.getInstance(conf); job.setJarByClass(TotalSort.class); job.setJobName("对于多个 reducer 进行全排序"); // 使用 Mapper 是因为本程序中的 mapper 没有用到(无用功) job.setMapperClass(Mapper.class); job.setReducerClass(TotalSortReduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 把分区文件加入分布式缓存中 job.addCacheFile(partitionFile.toUri()); // 设置分区器 job.setPartitionerClass(TotalOrderPartitioner.class); // 设置 reducer 的个数 job.setNumReduceTasks(2); // 如果要倒序排序,方法之一就是指定 job 的 sortcomparetor 类型 job.setSortComparatorClass(WritableDescComparetor.class); Path inputPath = new Path("/output"); Path outputPath = new Path("/TotalSort"); // outputPath.getFileSystem(conf).delete(outputPath, true); hdFileSystem.delete(outputPath, true); // map 端的输入会把文本文件读取成 kv 对,按照分割符把一行分成两部分,前面 key 后面 value // 如果分隔符不存在则正行都是 key ,value 为空,默认分隔符是 \t // 手动指定分隔符参数:mapreduce.input.keyvaluelinerecordreader.key.value.separator // job.setInputFormatClass(KeyValueTextInputFormat.class); job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 将随机抽样写入分区文件 // 在 job 启动之前启动抽样程序将抽样排序取出的中值写入到分区文件中 InputSampler.writePartitionFile(job, sampler); // 启动 job System.exit(job.waitForCompletion(true) ? 0 :1); } }
       
      
     
    
   
  
 
上一篇:Maven_Conf.Txt
下一篇:Android清理缓存
网友评论