当前位置:网站首页>MapReduce advanced application - full sorting and secondary sorting
MapReduce advanced application - full sorting and secondary sorting
2022-04-22 14:19:00 【Lantern dream in southern China】
Preface
Although now MapReduce There is little coding in the daily development of programs , But as big data Hadoop One of the three major plates of , Many of his inner thoughts are also the foundation of many follow-up frameworks . This blog , Nanguo focuses on reviewing MR Sorting related knowledge points in . Online knowledge about this point There may already be a lot of knowledge to introduce , I didn't intend to write this blog . Recently, I finally found time to watch Hadoop Most of the authoritative guidelines . therefore , This blog Nanguo tries to write this blog from the perspective of interview review .
Don't talk much , Dry goods to ~
Sort
By default ,MapReduce Sort the data set according to the key of the input record .

But sometimes , We need to be based on the actual application scenarios , Do some more complex sorting of the data .
for example : Full sort and auxiliary sort ( It also becomes a secondary sort ).
Total sort
Give Way MapReduce Generate a globally sorted file :
-
The simplest way is to use only one partition (partition), This is OK when dealing with small files . But when dealing with large files, it is extremely inefficient , All data is sent to a Reduce Sort , This can not make full use of the computing resources of the cluster , And in the case of a large amount of data , There is a good chance that OOM problem .
-
First, create a series of ordered files , Second, concatenate these files , Finally, a globally sorted file is generated . Its main idea is to use a partitioner To describe the global sorting of the output . The focus of the scheme is the zoning method , By default, according to hash Value for partitioning ( The default partition function is HashPartitioner, The principle of its implementation is to calculate map Output key Of hashCode , Then on Reduce Number Seeking remainder , Having the same remainder key Will be sent to the same Reduce); It can also be customized by users partitioner( Customize a class and inherit partitioner class , Rewriter getpartition Method )
Let me give you a simple example :
//Partition Zoning
public static class Partition extends Partitioner<Text,LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int num) {
// TODO Auto-generated method stub
if(key.toString().equals("apple")){
return 0;
}
if(key.toString().equals("xiaomi")){
return 1;
}
if(key.toString().equals("huawei")){
return 2;
}
return 3;
}
}
class GlobalSortPartitioner extends Partitioner<Text,LongWritable> implements Configurable {
private Configuration configuration = null;
private int indexRange = 0;
public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
// If the value range is equal to 26 Words , That means you only need to divide the index according to the first letter
int index = 0;
if(indexRange==26){
index = text.toString().toCharArray()[0]-'a';
}else if(indexRange == 26*26 ){
// Here we need to divide the index according to the first two letters
char[] chars = text.toString().toCharArray();
if (chars.length==1){
index = (chars[0]-'a')*26;
}
index = (chars[0]-'a')*26+(chars[1]-'a');
}
int perReducerCount = indexRange/numPartitions;
if(indexRange<numPartitions){
return numPartitions;
}
for(int i = 0;i<numPartitions;i++){
int min = i*perReducerCount;
int max = (i+1)*perReducerCount-1;
if(index>=min && index<=max){
return i;
}
}
// Here we use the first unscientific method
return numPartitions-1;
}
public void setConf(Configuration conf) {
this.configuration = conf;
indexRange = configuration.getInt("key.indexRange",26*26);
}
public Configuration getConf() {
return configuration;
}
}
- Use TotalOrderPartitioner Sort all
Hadoop There is also a built-in called TotalOrderPartitioner Partition implementation class , It solves the problem of full sorting . What it mainly does Actually and The second partition implementation class introduced above is very similar , That is to say, according to Key The dividing point will be different Key Send to the corresponding partition . for example , Below demo Used in :
// Set partition file , TotalOrderPartitioner Partition file must be specified
Path partitionFile = new Path( “_partitions”);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
public class TotalSort {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//access hdfs's user
System.setProperty("HADOOP_USER_NAME","root");
Configuration conf = new Configuration();
conf.set("mapred.jar", "D:\\MyDemo\\MapReduce\\Sort\\out\\artifacts\\TotalSort\\TotalSort.jar");
FileSystem fs = FileSystem.get(conf);
/*RandomSampler Parameter description
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
InputSampler.RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 10, 10);
// Set partition file , TotalOrderPartitioner Partition file must be specified
Path partitionFile = new Path( "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
Job job = Job.getInstance(conf);
job.setJarByClass(TotalSort.class);
job.setInputFormatClass(KeyValueTextInputFormat.class); // The data file defaults to \t Division
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(4); // Set up reduce The number of tasks , Partition file to reduce Based on the number , Split into n paragraph
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
FileInputFormat.addInputPath(job, new Path("/test/sort"));
Path path = new Path("/test/wc/output");
if(fs.exists(path))// If directory exists , Then delete the directory
{
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);
// Write random sampling data to partition file
InputSampler.writePartitionFile(job, sampler);
boolean b = job.waitForCompletion(true);
if(b)
{
System.out.println("OK");
}
}
}
Auxiliary sort ( Two order )
The secondary sort is in Hadoop interview especially MapReduce in High frequency interview questions . When the data itself has two dimensions , We are right. Key While sorting, you also need to sort Value Sort .
The principle of quadratic sorting
-
Map The starting stage
stay Map Stage , Use job.setInputFormatClass() Defined InputFormat, Divide the input data set into small data blocks split, meanwhile InputFormat Provide a RecordReader The implementation of the . In this case we are using TextInputFormat, The byte offset of the row in the whole job is taken as Key, The text in this line is used as Value. This is self-determination Mapper The input is <LongWritable,Text> Why . Then call custom. Mapper Of map Method , One by one <LongWritable,Text> Key value pair input to Mapper Of map Method
Be careful : Many use line numbers as key, In fact, this is not accurate . stay 《Hadoop Authoritative guide 》 I mentioned :


-
Map The last stage
stay Map At the end of the stage , Will call first job.setPartitionerClass() For this Mapper The output result of the Partition , Each partition is mapped to one Reducer. Called again in each partition job.setSortComparatorClass() Set up Key Compare function class sorting . You can see , This in itself is a quadratic sort . If it doesn't pass job.setSortComparatorClass() Set up Key Comparison function class , Then use Key Realized compareTo() Method -
Reduce Stage
stay Reduce Stage ,reduce() Method accepts all mappings to this Reduce Of map After output , Will also call job.setSortComparatorClass() Method set Key Comparison function class , Sort all the data . Then start building a Key Corresponding Value iterator . This is the time to use grouping , Use job.setGroupingComparatorClass() Method to set the grouping function class . As long as this comparator compares the two Key identical , They belong to the same group , Their Value In a Value iterator , And this iterator Key Use all that belong to the same group Key One of the first Key. The last is to enter Reducer Of reduce() Method ,reduce() The input to the method is all Key And it's Value iterator , Also note that the input and output types must be the same as the custom Reducer Consistency stated in
Implementation process of secondary sorting :
- Customize key, Assume that the data set is a two-dimensional data of the shape type . Here we build IntPair The type represents the combination key.
package com.xjh.sort_twice;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* Customize key Sort
* stay mr in , be-all key It needs to be compared and sorted , And twice , First, according to partitioner, Then according to the size . In this case, it is also necessary to compare twice .
* First sort according to the first field , Then sort the same items in the first field according to the second field .
* According to this , We can construct a composite class IntPair, He has two fields , First, use partition to sort the first field , Then use the comparison in the partition to sort the second field
* @author xjh
*
*/
// Self defined InPair class , Realization WritableComparator
public class IntPair implements WritableComparable<IntPair>{
int left;
int right;
public void set(int left, int right) {
// TODO Auto-generated method stub
this.left = left;
this.right = right;
}
public int getLeft() {
return left;
}
public int getRight() {
return right;
}
// Deserialization , Read in the binary from the stream and convert it to IntPair
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.left = in.readInt();
this.right = in.readInt();
}
// serialize , take IntPair Convert to binary output
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(left);
out.writeInt(right);
}
/*
* Why rewrite equal Method ?
* because Object Of equal Method default is the comparison of references of two objects , It means pointing to the same memory , The address is the same , Otherwise it's not equal ;
* If you now need to use the values in the object to determine whether it is equal , Overload equal Method .
*/
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
if(obj == null)
return false;
if(this == obj)
return true;
if (obj instanceof IntPair){
IntPair r = (IntPair) obj;
return r.left == left && r.right==right;
}
else{
return false;
}
}
/*
* rewrite equal At the same time, why must we rewrite hashcode?
* hashCode Are different integers generated by the compiler for different objects , according to equal Method definition : If two objects are equal (equal) Of , Then two objects call hashCode Must produce the same integer result ,
* namely :equal by true,hashCode It has to be for true,equal by false,hashCode It has to be by false, So we have to rewrite hashCode To ensure that equal Sync .
*/
@Override
public int hashCode() {
// TODO Auto-generated method stub
return left*157 +right;
}
// Realization key Comparison
@Override
public int compareTo(IntPair o) {
// TODO Auto-generated method stub
if(left != o.left)
return left<o.left? -1:1;
else if (right != o.right)
return right<o.right? -1:1;
else
return 0;
}
}
- Custom partition . Custom partition function class FirstPartitioner, Yes, the combination key First comparison of , Complete for all key Sort .
public static class MyPartitioner extends Partitioner<IntPair, IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value, int numOfReduce) {
// TODO Auto-generated method stub
return Math.abs(key.getLeft()*127) % numOfReduce;
}
}
And in main() Function Job Appoint :job.setPartitionerClass(MyPartitioner.class);
- Customize SortComparator Realization IntPair Class first and second Sort . Here we are IntPair Class has implemented compareTo() Method realization .
- Customize GroupingComparator class , Realize the data grouping in the partition .
/**
* When comparing groups , Only compare the original key, Not a combination key.
*/
public static class MyGroupParator implements RawComparator<IntPair>{
@Override
public int compare(IntPair o1 , IntPair o2) {
// TODO Auto-generated method stub
int l = o1.getLeft();
int r = o2.getRight();
return l == r ? 0:(l<r ?-1:1);
}
// A byte to byte ratio , Until a different byte is found , Then compare the size of this byte as the size comparison result of two byte streams .
@Override
public int compare(byte[] b1, int l1, int r1, byte[] b2,int l2, int r2) {
// TODO Auto-generated method stub
return WritableComparator.compareBytes(b1, l1, Integer.SIZE/8, b2, l2, Integer.SIZE/8);
}
}
Reference material :
1.MapReduce Two order
版权声明
本文为[Lantern dream in southern China]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204221414011177.html
边栏推荐
猜你喜欢

我为什么那么爱用飞项做任务管理

Timer--

深入理解读写锁ReentrantReadWriteLock和并发容器CopyOnWriteArrayList

【论文笔记】Vision Transformers for Dense Prediction

Completion of minmao e-commerce in 10 days - Implementation of user module (2nd day)

In February, I relied on this PDF document to interview bat. Unexpectedly, I received five offers

知识就是力量,但更重要的是运用知识的能力---网页端微信扫码支付-技术设计

An error is reported when reading the attribute value of the object through the variable storage key value in TS (TS: 7053)
![[paper notes] vision transformers for dense prediction](/img/71/aaf1509237192923ee71a5148e5502.png)
[paper notes] vision transformers for dense prediction

POM in idea Mysql5. XML file 7 coordinate red error
随机推荐
Independent station operation | 6 Facebook promotion tips, do you know?
HanderThread基本使用以及内部实现原理
Redis相比memcached
[zeekr_tech] Introduction to ros/ros 2
pip命令和在线、离线安装方法
2020火爆全网系列:这是一份非常适合领取与收藏的Android进阶-面试重难点资料笔记!持续更新大厂高质量面试链接
图 钥匙和房间
獲取數據庫中數值時,數據庫有值,卻為空??
Apache iotdb's UDF Source Analysis (1)
Redis connection tool cannot connect to redis in docker
spark代码 spark-submit提交yarn-cluster模式
Timer--
Redis batch delete data (wildcard)
BitMap BloomFilter BitSet详解
阻塞队列-
Double pointer 𞓜 ordered array to reorder the linked list and remove elements 26, 27 and 83
ipv6相关
Mysql database transferring SQL file
Deep transfer learning
深入剖析volatile原理