经过复合key凭借hadoop本身的排序完成secondary排序ITeye - 超凡娱乐

经过复合key凭借hadoop本身的排序完成secondary排序ITeye

2019年03月14日09时03分58秒 | 作者: 笑槐 | 标签: 频度,完成,资源类型 | 浏览: 2275

问题描绘:数据结构 使命id,资源类型,发布日期,词,频度

现已按 使命id,资源类型,发布日期,词 汇总了频度信息,现在需要以 使命id,资源类型,发布日期 为分组,组内按频度倒排,提取前200条记载

 

参阅hadoop自带示例中的org.apache.hadoop.examples.SecondarySort完成

复合key: WordFreq TagHead,词,频度 ,其间TagHead表达group,即 使命id,资源类型,发布日期

1.在WordFreq中经过Override compareTo完成组内按频度倒排

@Override

public int compareTo(WordFreq other) {

return ComparisonChain.start()

.compare(this.getGroup(), other.getGroup())

.compare(other.count, this.count)

.compare(this.tag.getWord(), other.tag.getWord()).result();

}

2.在TagHead中Override如下3项

@Override

public int compareTo(TagHead other) {

return ComparisonChain.start().compare(this.tagsid, other.tagsid)

.compare(this.sourceType, other.sourceType)

.compare(this.releaseDateDay, other.releaseDateDay).result();

}

 

@Override

public boolean equals(Object o) {

if (o instanceof TagHead) {

TagHead other = (TagHead) o;

return this.tagsid.equals(other.tagsid)

this.sourceType.equals(other.sourceType)

this.releaseDateDay.equals(other.releaseDateDay);

}

return false;

}

 

@Override

public int hashCode() {

int hash = (this.tagsid != null ? Integer.parseInt(this.tagsid) : 0);

hash += (this.sourceType != null ? Integer.parseInt(this.sourceType) * 13

: 0);

hash += (this.releaseDateDay != null ? Integer

.parseInt(this.releaseDateDay.replace("-", "")) * 7 : 0);

return hash;

 

}

 

3.简单项:SimpleWordFreq 词,频度

4.

public class SubSortingWordFreqMapper extends

 

Mapper LongWritable, Text, WordFreq, SimpleWordFreq {

...

}

 

public static class SubSortingWordFreqReducer extends

Reducer WordFreq, SimpleWordFreq, Text, NullWritable {

@Override

protected void reduce(WordFreq key, Iterable SimpleWordFreq values,

Context context) throws IOException, InterruptedException {

for (SimpleWordFreq value : values) {

...

}

 

}

}

5.自定义Partitioner,核算nature key即group的哈希值

public class TagCloudPartitioner extends Partitioner WordFreq, SimpleWordFreq {

private static Logger log = LoggerFactory

.getLogger(TagCloudPartitioner.class);

 

@Override

public int getPartition(WordFreq key, SimpleWordFreq value, int numPartitions) {

int hashCode = key.getGroup().hashCode();

log.debug(key.getGroup().getHead("_") + ";hashCode=" + hashCode);

return hashCode % numPartitions;

}

 

}

 

6.自定义groupComparator

public class TagCloudHeadGroupingComparator extends WritableComparator {

 

protected TagCloudHeadGroupingComparator() {

super(WordFreq.class, true);

}

 

@Override

public int compare(WritableComparable tp1, WritableComparable tp2) {

WordFreq wordFreq = (WordFreq) tp1;

WordFreq wordFreq2 = (WordFreq) tp2;

return wordFreq.compareGroup(wordFreq2);

}

}

7.调用时特别设置

job.setPartitionerClass(TagCloudPartitioner.class);

job.setGroupingComparatorClass(TagCloudHeadGroupingComparator.class);

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表超凡娱乐立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章