当前位置 : 主页 > 编程语言 > java >

【MapReduce】MR 框架原理 之 Combiner局部汇总

来源:互联网 收集:自由互联 发布时间:2022-08-15
文章目录 ​​Combiner​​ ​​▶ 定义​​ ​​☠ Combiner案例​​ ​​一、需求分析​​ ​​二、代码实现​​ ​​2.1 wordCountCombiner合并​​ ​​2.2 Mapper阶段​​ ​​2.3 Reducer阶段




文章目录

  • ​​Combiner​​
  • ​​▶ 定义​​
  • ​​☠ Combiner案例​​
  • ​​一、需求分析​​
  • ​​二、代码实现​​
  • ​​2.1 wordCountCombiner合并​​
  • ​​2.2 Mapper阶段​​
  • ​​2.3 Reducer阶段​​
  • ​​2.4 Driver阶段​​

Combiner

▶ 定义

Ⅰ. Combiner 是MR程序中的Mapper、Reducer之外的一种组件

Ⅱ. ​​Combiner组件的父类就是Reducer​​

Ⅲ. Combiner 和 Reducer 的区别在于运行的位置:

  • ​​Combiner 是在每一个MapTask所在的节点运行​​
  • ​​Reducer 是接收全局所有Mapper的输出结果​​

Ⅳ. ​​Combiner 的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量​​

Ⅴ. Combiner 能够应你用的前提是不能影响最终的业务逻辑(​​一般Combiner适用于汇总类型的业务​​),而且,​​Combiner的输出kv应该跟Reducer的输入kv类型要对应起来​​

​​返回顶部​​


☠ Combiner案例

一、需求分析

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

(1)数据输入

【MapReduce】MR 框架原理 之 Combiner局部汇总_数据

(2)期望输出数据

  • 期望:Combine输入数据多,输出时经过合并,输出数据降低。
  • 【MapReduce】MR 框架原理 之 Combiner局部汇总_mapreduce_02

​​返回顶部​​


二、代码实现

【MapReduce】MR 框架原理 之 Combiner局部汇总_hadoop_03


首先我们运行最开始的wordCount案例,查看控制台打印的信息,发现Combine部分为空,并没有执行这一部分。 接下来我们编写一个wordCountCombiner类,相当于在Reducer之前,进行一个提前合并。


2.1 wordCountCombiner合并

package 第三章_MR框架原理.Combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class wordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1.遍历求和
int sum = 0;
for (IntWritable value:values){
sum += value.get();
}
v.set(sum);
// 2.写出
context.write(key,v);
}
}

​​返回顶部​​


2.2 Mapper阶段

package 第三章_MR框架原理.Combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Mapper 阶段
* KEYIN 输入数据的key类型
* VALUEIN 输入数据的value类型
* KEYOUT 输出数据的key类型
* VALUEOUT 输出数据的value类型
*/
public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// 创建对象
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行数据
// atguigu atguigu
String line = value.toString();

// 2.切分
String[] words = line.split(" ");

// 3.循环写出
for (String word:words){
// 设置键 atguigu
k.set(word);
// 设置词频为 1 , 也可以在上面创建对象时默认为1
v.set(1);
// 生成键值对 (atguigu,1)
context.write(k,v);
}

}
}

​​返回顶部​​


2.3 Reducer阶段

package 第三章_MR框架原理.Combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Reducer 阶段
* KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型
* KEYOUT 最终输出数据的key类型
* VALUEOUT 最终输出数据的value类型
*/
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

IntWritable v = new IntWritable();

@Override
// Iterable<IntWritable> values 对key的value值进行迭代实现词频统计
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

// atguigu,1
// atguigu,1

// 1.累加求和
int sum = 0;
for (IntWritable value:values){
// value是IntWritable类型数据,通过get转为int型,才好计算
sum += value.get();
}
// 2.写出结果
v.set(sum);
context.write(key,v);
}
}

​​返回顶部​​


2.4 Driver阶段

  • 设置Combiner:​​job.setCombinerClass(wordCountCombiner.class);​​
package 第三章_MR框架原理.Combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class wordCountDriver {

public static void main(String[] args) {

Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.设置jar存储位置
job.setJarByClass(wordCountDriver.class);
// 3.关联map、reduce类
job.setMapperClass(wordCountMapper.class);
job.setReducerClass(wordCountReducer.class);
// 4.设置Mapper阶段输出数据的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置Reducer阶段输出数据的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置Combiner
job.setCombinerClass(wordCountCombiner.class);
// 7.设置输入、出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Combiner\\output1"));
// 打jar包
// FileInputFormat.setInputPaths(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 8 .提交job
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}

​​返回顶部​​


通过设置Combine类,在Reducer阶段之前提前进行一次统计,再将结果传输到Reducer,以减少网络传输量。上面在案例分析的时候,还有一个方案二提到,在本案例中由于Combiner和Reducer的作用相同,所以可以在设置Combiner类的时候直接设置成Reducer类,最终达到的效果是一样的。

【MapReduce】MR 框架原理 之 Combiner局部汇总_mapreduce_04

​​返回顶部​​



上一篇:【CentOS】bash: rz: 未找到命令
下一篇:没有了
网友评论