MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。 过程 在map阶段,使用job.setInputForma
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。
过程
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是
流程
参考:http://zengzhaozheng.blog.51cto.com/8219051/1379271/
map:
每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,
就调用自定义二次排序函数对其进行排序。
reducer:
就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。
此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,
也就是说有进行了n-1次比较。
实例
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 二次排序对象
*/
public class SortAssist {
public static class UnionKey implements WritableComparable<UnionKey> {
public String partitionerKey;
public String resortString;
public String getPartitionerKey() {
return partitionerKey;
}
public void setPartitionerKey(String partitionerKey) {
this.partitionerKey = partitionerKey;
}
public String getResortString() {
return resortString;
}
public void setResortString(String resortString) {
this.resortString = resortString;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(partitionerKey);
out.writeUTF(resortString);
}
@Override
public void readFields(DataInput in) throws IOException {
partitionerKey = in.readUTF();
resortString = in.readUTF();
}
@Override
public int hashCode() {
return partitionerKey.hashCode() * 157 + resortString.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (this == obj)
return true;
if (obj instanceof UnionKey) {
UnionKey r = (UnionKey) obj;
return r.partitionerKey.equals(partitionerKey) && r.resortString.equals(resortString);
} else {
return false;
}
}
//自定义比较器
@Override
public int compareTo(UnionKey o) {
//确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
if (!partitionerKey.equals(o.partitionerKey)) {
return partitionerKey.compareTo(o.partitionerKey);
} else if (resortString != o.resortString) {
//相同区内按照组合键的第二个键的升序排序
return resortString.compareTo(o.resortString);
} else
return 0;
}
}
//自定义分区处理器
public static class SelfPartitioner extends Partitioner<UnionKey, Text> {
@Override
public int getPartition(UnionKey key, Text value, int numPartitions) {
return Math.abs(key.partitionerKey.hashCode() * 127) % numPartitions;
}
}
//自定义分区策略
public static class SelfGroupingComparator extends WritableComparator {
protected SelfGroupingComparator() {
super(UnionKey.class, true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable writableComparable, WritableComparable writableComparable2) {
UnionKey unionKey = (UnionKey) writableComparable;
UnionKey unionKey2 = (UnionKey) writableComparable2;
String key1 = unionKey.getPartitionerKey();
String key2 = unionKey2.getPartitionerKey();
return
job中调用
.setPartitionerClass(SortAssist.SelfPartitioner.class); //自定义分组器
job.setGroupingComparatorClass(SortAssist.SelfGroupingComparator.class); //自定义分区策略