文章目录 Combiner ▶ 定义 ☠ Combiner案例 一、需求分析 二、代码实现 2.1 wordCountCombiner合并 2.2 Mapper阶段 2.3 Reducer阶段
文章目录
- Combiner
- ▶ 定义
- ☠ Combiner案例
- 一、需求分析
- 二、代码实现
- 2.1 wordCountCombiner合并
- 2.2 Mapper阶段
- 2.3 Reducer阶段
- 2.4 Driver阶段
Combiner
▶ 定义
Ⅰ. Combiner 是MR程序中的Mapper、Reducer之外的一种组件
Ⅱ. Combiner组件的父类就是Reducer
Ⅲ. Combiner 和 Reducer 的区别在于运行的位置:
- Combiner 是在每一个MapTask所在的节点运行
- Reducer 是接收全局所有Mapper的输出结果
Ⅳ. Combiner 的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量
Ⅴ. Combiner 能够应你用的前提是不能影响最终的业务逻辑(一般Combiner适用于汇总类型的业务),而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来
返回顶部
☠ Combiner案例
一、需求分析
统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
(1)数据输入
(2)期望输出数据
- 期望:Combine输入数据多,输出时经过合并,输出数据降低。
返回顶部
二、代码实现
首先我们运行最开始的wordCount案例,查看控制台打印的信息,发现Combine部分为空,并没有执行这一部分。 接下来我们编写一个wordCountCombiner类,相当于在Reducer之前,进行一个提前合并。
2.1 wordCountCombiner合并
package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class wordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1.遍历求和
int sum = 0;
for (IntWritable value:values){
sum += value.get();
}
v.set(sum);
// 2.写出
context.write(key,v);
}
}
返回顶部
2.2 Mapper阶段
package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper 阶段
* KEYIN 输入数据的key类型
* VALUEIN 输入数据的value类型
* KEYOUT 输出数据的key类型
* VALUEOUT 输出数据的value类型
*/
public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// 创建对象
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行数据
// atguigu atguigu
String line = value.toString();
// 2.切分
String[] words = line.split(" ");
// 3.循环写出
for (String word:words){
// 设置键 atguigu
k.set(word);
// 设置词频为 1 , 也可以在上面创建对象时默认为1
v.set(1);
// 生成键值对 (atguigu,1)
context.write(k,v);
}
}
}
返回顶部
2.3 Reducer阶段
package 第三章_MR框架原理.Combiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Reducer 阶段
* KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型
* KEYOUT 最终输出数据的key类型
* VALUEOUT 最终输出数据的value类型
*/
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
// Iterable<IntWritable> values 对key的value值进行迭代实现词频统计
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// atguigu,1
// atguigu,1
// 1.累加求和
int sum = 0;
for (IntWritable value:values){
// value是IntWritable类型数据,通过get转为int型,才好计算
sum += value.get();
}
// 2.写出结果
v.set(sum);
context.write(key,v);
}
}
返回顶部
2.4 Driver阶段
- 设置Combiner:job.setCombinerClass(wordCountCombiner.class);
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class wordCountDriver {
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.设置jar存储位置
job.setJarByClass(wordCountDriver.class);
// 3.关联map、reduce类
job.setMapperClass(wordCountMapper.class);
job.setReducerClass(wordCountReducer.class);
// 4.设置Mapper阶段输出数据的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置Reducer阶段输出数据的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置Combiner
job.setCombinerClass(wordCountCombiner.class);
// 7.设置输入、出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\output1"));
// 打jar包
// FileInputFormat.setInputPaths(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 8 .提交job
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
返回顶部
通过设置Combine类,在Reducer阶段之前提前进行一次统计,再将结果传输到Reducer,以减少网络传输量。上面在案例分析的时候,还有一个方案二提到,在本案例中由于Combiner和Reducer的作用相同,所以可以在设置Combiner类的时候直接设置成Reducer类,最终达到的效果是一样的。
返回顶部