1、MapReduce是什么 HadoopMapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方
Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,
2、 MapReduce做什么MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。
(1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
(2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。
一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
3、第一个MapReduce程序:WordCountWordCount单词计数是最简单也是最能体现MapReduce思想的程序之一。
启动一个普通的maven工程。
1.先配置pom.xml 导入三个jar包<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency>2.创建一个Mapper类(注意导的包)
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,类型:long * 但是在Hadoop中有更精简的序列化接口,所以不直接用long ,而是用Longwriterable * VALUEIN:默认情况下,是mr框架所读的一行文本的内容, 类型:String ,同上,用Text(import org.apache.hadoop.io.Text) * * KEYOUT: 是用户自定义逻辑处理完成之后输出数据的key,在此处是单词,类型 String 同上,用Text * VALUEOUT: 是用户自定义逻辑处理完成之后输出数据的value,在此处是单词次数,类型 Integer 同上,用 Intwriterable */ public class WordCountMap extends Mapper<LongWritable ,Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取每行文本 String line = value.toString(); //splite拆分每行 String[] words = line.split(" "); //分词 //去除每个单词 for (String woed: words ) { //将每行数据转化为text类型 Text wordText = new Text(woed); //将 1 转化为intwritab类型 IntWritable outValue = new IntWritable(1); //写出单词跟对应的1 context.write(wordText,outValue); } } }3.创建一个Reducer类(注意导的包)
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer <Text, IntWritable,Text,IntWritable>{ // key 输入单词的名字 // values 输入一串 1 // content 输出的工具 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable number:values ) { sum += number.get(); } context.write(key,new IntWritable()); } }4.创建driver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //定义配置文件 Configuration conf = new Configuration(); //准备一个空任务 Job job = Job.getInstance(conf,"wc"); //设置任务的输入数据源 FileInputFormat.addInputPath(job,new Path("D:\\桌面下载\\123.txt")); //设置你的Mapper任务类 job.setMapperClass(WordCountMap.class); //设置mapper的key的输出数据类型 job.setMapOutputKeyClass(Text.class); //设置mapper的valuer的输出数据类型 job.setMapOutputValueClass(IntWritable.class); //设置你的Reducer任务类 job.setReducerClass(WordCountReduce.class); //设置Reduce任务类输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置任务的输出数据目标 FileOutputFormat.setOutputPath(job,new Path("D:\\桌面下载\\456.txt")); //启动任务并执行 job.waitForCompletion(true); } }
如果运行没有报错那么就去任务输出路径查看生成的文件夹。
如果报错可能是环境变量问题博文:https://blog.csdn.net/tmh1995/article/details/106551092
以上为在Windows系统上实现MapReduce的Word Count