当前位置 : 主页 > 网络编程 > PHP >

Hadoop-SecondarySort

来源:互联网 收集:自由互联 发布时间:2023-10-08
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。 过程 在map阶段,使用job.setInputForma


MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。

过程

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是

流程

参考:​​http://zengzhaozheng.blog.51cto.com/8219051/1379271/​​


map:

每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,

就调用自定义二次排序函数对其进行排序。

reducer:

就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。

此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,

也就是说有进行了n-1次比较。

实例

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* 二次排序对象
*/
public class SortAssist {

public static class UnionKey implements WritableComparable<UnionKey> {

public String partitionerKey;

public String resortString;

public String getPartitionerKey() {
return partitionerKey;
}

public void setPartitionerKey(String partitionerKey) {
this.partitionerKey = partitionerKey;
}

public String getResortString() {
return resortString;
}

public void setResortString(String resortString) {
this.resortString = resortString;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(partitionerKey);
out.writeUTF(resortString);
}

@Override
public void readFields(DataInput in) throws IOException {
partitionerKey = in.readUTF();
resortString = in.readUTF();
}

@Override
public int hashCode() {
return partitionerKey.hashCode() * 157 + resortString.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (this == obj)
return true;
if (obj instanceof UnionKey) {
UnionKey r = (UnionKey) obj;
return r.partitionerKey.equals(partitionerKey) && r.resortString.equals(resortString);
} else {
return false;
}
}
//自定义比较器
@Override
public int compareTo(UnionKey o) {
//确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
if (!partitionerKey.equals(o.partitionerKey)) {
return partitionerKey.compareTo(o.partitionerKey);
} else if (resortString != o.resortString) {
//相同区内按照组合键的第二个键的升序排序
return resortString.compareTo(o.resortString);
} else
return 0;
}
}

//自定义分区处理器
public static class SelfPartitioner extends Partitioner<UnionKey, Text> {

@Override
public int getPartition(UnionKey key, Text value, int numPartitions) {
return Math.abs(key.partitionerKey.hashCode() * 127) % numPartitions;
}

}

//自定义分区策略
public static class SelfGroupingComparator extends WritableComparator {
protected SelfGroupingComparator() {
super(UnionKey.class, true);
}

@SuppressWarnings("rawtypes")
public int compare(WritableComparable writableComparable, WritableComparable writableComparable2) {
UnionKey unionKey = (UnionKey) writableComparable;
UnionKey unionKey2 = (UnionKey) writableComparable2;
String key1 = unionKey.getPartitionerKey();
String key2 = unionKey2.getPartitionerKey();
return

job中调用

.setPartitionerClass(SortAssist.SelfPartitioner.class); //自定义分组器
job.setGroupingComparatorClass(SortAssist.SelfGroupingComparator.class); //自定义分区策略


上一篇:7.3 RelativeLayout布局详解
下一篇:没有了
网友评论