当前位置 : 主页 > 编程语言 > c++ >

MapReduce

来源:互联网 收集:自由互联 发布时间:2021-06-30
MapReduce package com.zhiyou100;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoo
MapReduce
package com.zhiyou100;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	// 定义 map
	public static class WordCountMap extends Mapper
 
   {

		private String[] infos;
		
		private Text oKey = new Text();
		
		private final IntWritable oValue = new IntWritable(1);
		
		/*
		 * context 它的 write 方法 可以传递给 reducer
		 * 
		 * LongWritable key, Text value 文本文件的每一个 kv 代表这文本文件里面的一行数据
		 * key: 该行第一个字符在 该文本中的 偏移量
		 * val:该文本文件的内容
		 */
		@Override
		protected void map(LongWritable key, Text value, Mapper
  
   .Context context) throws IOException, InterruptedException { // 解析一行数据,转换成一个单词组成的数组 infos = value.toString().split("\\s"); for (String i : infos) { // 把单词形成 kv 对发送给 reducer (单词, 1) oKey.set(i); context.write(oKey, oValue); } } } // 定义 reducer // reducer 的输入类型与 map 的输出类型相对应 public static class WordCountReducer extends Reducer
   
    { private int sum; private IntWritable oValue = new IntWritable(0); // 聚合 @Override protected void reduce(Text key, Iterable
    
      values, Reducer
     
      .Context context) throws IOException, InterruptedException { sum = 0; for(IntWritable value : values){ sum += value.get(); } // 输出 kv (单词, 单词的计数) oValue.set(sum); context.write(key, oValue); } } // 组装一个 job(一次数据处理的过程) 到 mr 引擎上执行 public static void main(String[] args) throws Exception { // 构建一个 configuration ,用来配置 hdfs 的位置和 mr 的参数 // 运行mapreduce程序前都要初始化Configuration,该类主要是读取mapreduce系统配置信息,这些信息包括hdfs还有mapreduce,也就是安装hadoop时候的配置文件 Configuration configuration = new Configuration(); // 创建 job 对象 Job job = Job.getInstance(configuration); job.setJarByClass(WordCount.class); job.setJobName("第一个作业:wordcount"); // 配置 mr 执行类 job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReducer.class); // 设置 输出 kv 类型 // map 的 输出 kv 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // reducer -- 整个 mrjob 的最终输出 kv 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 当 map 的输出 kv 类型和 mrjob 的最终输出 kv 类型一致的时候可以不用配置 MapOutputKey 的类型 // 两者不一致的时候都要设置 // 设置数据源(待处理) // 设置 mrjob 处理的数据文件位置, // path 可以指定一个文件也可以制定一个文件夹 // 指定文件就处理该文件,如果是文件夹就处理该文件夹下所有的子文件 Path inputPath = new Path("/README.TXT"); // 可以通过多次调用该方法给mrjob 设置多个处理文件的路径 FileInputFormat.addInputPath(job, inputPath); // 设置目标数据的存放位置(设置mrjob 的最终输出结果位置) // 这个路径是一个目录,不能是一个文件,而且当前 hdfs 上不能有这个目录 Path outputPath = new Path("/test/output4"); // 如果存在 path 路径,就删除 outputPath.getFileSystem(configuration).delete(outputPath,true); // 一个 mrjob 只能有一个 输出目录 FileOutputFormat.setOutputPath(job, outputPath); // 启动作业,分布式计算提交给 mr 引擎 // 是否打印处理过程中 输出的日志 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
     
    
   
  
 
网友评论