当前位置:网站首页>MapReduce core and foundation demo
MapReduce core and foundation demo
2022-04-23 10:12:00 【zhaojiew】
MapReduce Core and basic routines
MapReduce It's a programming framework for distributed computing programs , It's user development “ be based on Hadoop Data analysis application ” The core framework of .
MapReduce The core function is to integrate the business logic code written by the user and the default components into a complete distributed computing program , Running concurrently in a Hadoop On the cluster
advantage : Easy to program 、 Fault tolerance , Extensibility 、 Offline processing of massive data
shortcoming : Not good at real-time computing ( Returns results in milliseconds or seconds ) , Not good at streaming Computing ( The input data set is static ), Not good at DAG( Directed acyclic graph ) Calculation ( Multiple applications have dependencies , The input of the latter application is the output of the former . Every MapReduce The output of the job is written to disk , It's going to create a lot of disks IO, It leads to very low performance )
The core idea
- Distributed computing programs often need to be divided into at least 2 Stages .
- First stage MapTask Concurrent instance , Full parallel operation , Irrelevant .
- Second stage ReduceTask Concurrent instances are irrelevant , But their data depends on everything from the previous stage MapTask Output of concurrent instances .
- MapReduce The programming model can only contain one Map Stage and a Reduce Stage , If the user's business logic is very complex , There's only one MapReduce Program , Serial operation ( But it's less efficient ).
MapReduce process
- MrAppMaster: Responsible for process scheduling and state coordination of the whole program .
- MapTask: be responsible for Map The whole data processing flow of the stage .
- ReduceTask: be responsible for Reduce The whole data processing flow of the stage
Data serialization type cross reference
Java type | Hadoop Writable type |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWriatable |
wordcount Routines and programming specifications
General idea
maven rely on
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
Compiling MR The program is divided into three parts : Mapper、 Reducer and Driver
mapper
map() Method (MapTask process ) For each <K,V> Call once
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outK.set(word);
context.write(outK,outV);
}
}
}
reducer
ReduceTask The process is the same for each group k Of <k,v> Group call once reduce() Method
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
int sum;
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// Create configuration get job object
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// relation Driver Program
job.setJarByClass(WordCountDriver.class);
// relation mapper and reducer Program
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// Set up mapper Output format
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Set up reducer Output format
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// The local path
FileInputFormat.setInputPaths(job, new Path("src/main/resources/input"));
FileOutputFormat.setOutputPath(job, new Path("src/main/resources/output"));
// pack jar Cluster path
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Local tests can be run directly
Submit cluster test , Pack first jar, stay hadoop colony shell perform
hadoop jar wc.jar com.zhaojie.wordcount.WordCountDriver /pythonzen.txt /output
serialize
Java Serialization of is a heavyweight serialization framework (Serializable), After an object is serialized , There will be a lot of additional information attached ( All kinds of verification information , Header, Inheritance system, etc ), It's not easy to transmit efficiently in the network . therefore ,Hadoop I have developed a serialization mechanism (Writable)
characteristic : compact : Efficient use of storage space ; Fast : The extra cost of reading and writing data is small ; interoperability : Support multi language interaction
step :
- Create a class implementation Writable Interface
- When deserializing , Need reflection to call null parameter constructor , So there has to be a null parameter construct
- Override the serialization method
- Override the deserialization method
- Note that the order of deserialization is exactly the same as that of serialization
- To display the results in a file , Need to rewrite toString(), You can use "\t" Separate , Convenient for subsequent use .
- If you need to customize bean Put it in key Transmission of , Then we need to realize Comparable Interface ( Can be implemented directly WritableComparable Interface , Change interface inheritance Writable abstract class ), because MapReduce In the box Shuffle Process requirements for key Must be able to sort
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
mapper and reducer
As you can see from generics , here mapper and reducer The output and receive classes are no longer simple data types , It's about achieving Writable Object of the interface ,MR The framework will automatically serialize and deserialize
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
//0 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String phone = items[1];
String upflow = items[items.length - 3];
String downflow = items[items.length - 2];
outV.setUpFlow(Long.parseLong(upflow));
outV.setDownFlow(Long.parseLong(downflow));
outV.setSumFlow();
outK.set(phone);
context.write(outK, outV);
}
}
public class FlowReduce extends Reducer<Text, FlowBean,Text, FlowBean> {
long upflowsum;
long downflowsum;
FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
upflowsum = 0;
downflowsum = 0;
for (FlowBean value : values) {
upflowsum += value.getUpFlow();
downflowsum += value.getDownFlow();
value.setSumFlow();
}
outV.setUpFlow(upflowsum);
outV.setDownFlow(downflowsum);
outV.setSumFlow();
context.write(key,outV);
}
}
版权声明
本文为[zhaojiew]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230949431129.html
边栏推荐
猜你喜欢
JUC concurrent programming 06 -- in-depth analysis of AQS source code of queue synchronizer
自定义登录失败处理
Computer network security experiment II DNS protocol vulnerability utilization experiment
解决VMware卸载后再安装出现的问题
Question bank and answers of Shanghai safety officer C certificate examination in 2022
Configuration of LNMP
NEC infrared remote control coding description
Redis design and Implementation
Examination questions and answers of the third batch (main person in charge) of Guangdong safety officer a certificate in 2022
MapReduce压缩
随机推荐
DBA common SQL statements (1) - overview information
Ansible playbook syntax and format automate cloud computing
杰理之AES能256bit吗【篇】
Juc并发编程07——公平锁真的公平吗(源码剖析)
Function realization of printing page
杰理之有时候发现内存被篡改,但是没有造成异常,应该如何查找?【篇】
链表相交(链表)
Turn: Maugham: reading is a portable refuge
定义链表(链表)
基于PyQt5实现弹出任务进度条功能示例
通过流式数据集成实现数据价值(2)
Arm debugging (1): two methods to redirect printf to serial port in keil
杰理之通常影响CPU性能测试结果的因素有:【篇】
Redis design and Implementation
Understand the new economic model of platofarm and its ecological progress
Realizing data value through streaming data integration (4) - streaming data pipeline
Realize data value through streaming data integration (1)
《谷雨系列》空投
Ansible cloud computing automation command line compact version
Classic routine: DP problem of a kind of string counting