当前位置:网站首页>MapReduce核心和基础Demo
MapReduce核心和基础Demo
2022-04-23 09:50:00 【zhaojiew】
MapReduce核心和基础例程
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上
优点:易于编程、容错性,扩展性、海量数据的离线处理
缺点:不擅长实时计算(在毫秒或者秒级内返回结果) ,不擅长流式计算(输入数据集是静态的 ),不擅长 DAG(有向无环图)计算 (多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出 。每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下 )
核心思想
- 分布式的运算程序往往需要分成至少 2 个阶段。
- 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
- 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
- MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行(但是效率较低)。
MapReduce 进程
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责 Map 阶段的整个数据处理流程。
- ReduceTask:负责 Reduce 阶段的整个数据处理流程
数据序列化类型对照
| Java 类型 | Hadoop Writable 类型 |
|---|---|
| Boolean | BooleanWritable |
| Byte | ByteWritable |
| Int | IntWritable |
| Float | FloatWritable |
| Long | LongWritable |
| Double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
| Null | NullWriatable |
wordcount例程和编程规范
总体思路
maven依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
编写的MR程序分成三个部分: Mapper、 Reducer 和 Driver
mapper
map()方法(MapTask进程)对每一个<K,V>调用一次
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outK.set(word);
context.write(outK,outV);
}
}
}
reducer
ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
int sum;
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//创建配置获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//关联Driver程序
job.setJarByClass(WordCountDriver.class);
//关联 mapper和reducer程序
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//设置mapper输出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//本地路径
FileInputFormat.setInputPaths(job, new Path("src/main/resources/input"));
FileOutputFormat.setOutputPath(job, new Path("src/main/resources/output"));
//打包jar集群路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
本地测试直接运行即可
提交集群测试,先打包jar,在hadoop集群shell执行
hadoop jar wc.jar com.zhaojie.wordcount.WordCountDriver /pythonzen.txt /output
序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)
特点:紧凑 : 高效使用存储空间;快速: 读写数据的额外开销小;互操作: 支持多语言的交互
步骤:
- 创建类实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序和序列化的顺序完全一致
- 要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
- 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口(可以直接实现WritableComparable接口,改接口继承了Writable抽象类),因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
mapper和reducer
从泛型可以看出,此时mapper和reducer输出和接收的类不再是简单数据类型,而是实现了Writable接口的对象,MR框架会自动进行序列化和反序列化处理
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
//0 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String phone = items[1];
String upflow = items[items.length - 3];
String downflow = items[items.length - 2];
outV.setUpFlow(Long.parseLong(upflow));
outV.setDownFlow(Long.parseLong(downflow));
outV.setSumFlow();
outK.set(phone);
context.write(outK, outV);
}
}
public class FlowReduce extends Reducer<Text, FlowBean,Text, FlowBean> {
long upflowsum;
long downflowsum;
FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
upflowsum = 0;
downflowsum = 0;
for (FlowBean value : values) {
upflowsum += value.getUpFlow();
downflowsum += value.getDownFlow();
value.setSumFlow();
}
outV.setUpFlow(upflowsum);
outV.setDownFlow(downflowsum);
outV.setSumFlow();
context.write(key,outV);
}
}
版权声明
本文为[zhaojiew]所创,转载请带上原文链接,感谢
https://blog.csdn.net/sinat_41567654/article/details/124278203
边栏推荐
- MySQL of database -- basic common query commands
- [2020wc Day2] F. Clarice picking mushrooms (subtree and query, light and heavy son thought)
- Go language learning notes - structure | go language from scratch
- JS what is an event? Event three elements and operation elements
- Redis 内存占满导致的 Setnx 命令执行失败
- 自定义登录失败处理
- DVWA range practice
- Kernel PWN learning (4) -- double fetch & 0ctf2018 baby
- Redis expired key cleaning and deletion policy summary
- [COCI] lattice (dichotomy + tree divide and conquer + string hash)
猜你喜欢

实践六 Windows操作系统安全攻防

Construire neuf capacités de fabrication agile à l'ère métacosmique

Alibaba cloud architects interpret the four mainstream game architectures

Pre parsing of JS
![Cloud computing competition -- basic part of 2020 competition [task 3]](/img/a2/36ff5eafd18534207e6ab01422ea59.png)
Cloud computing competition -- basic part of 2020 competition [task 3]

Personal homepage software fenrus

ABAP CDs view with association example

阿里云架构师解读四大主流游戏架构

Redis 内存占满导致的 Setnx 命令执行失败

AI上推荐 之 MMOE(多任务yyds)
随机推荐
Three challenges that a successful Devops leader should be aware of
SAP pi / PO soap2proxy consumption external WS example
C语言:表达式求值(整型提升、算术转换 ...)
理解作用域
Two declaration methods of functions of JS
Where is int a = 1 stored
Comparative analysis of meta universe from the dimension of knowledge dissemination
Kernel PWN learning (3) -- ret2user & kernel ROP & qwb2018 core
1D / 1D dynamic programming learning summary
Less than 100 secrets about prime numbers
Comparison of overloading, rewriting and hiding
KVM installation and deployment
Pyqt5与通信
Using sqlmap injection to obtain the account and password of the website administrator
MySQL of database -- overview and installation
MySQL - Chapter 1 (data type 2)
SAP pi / PO function operation status monitoring and inspection
LeetCode 1611. The minimum number of operations to make an integer 0
JS DOM event
SAP RFC_ CVI_ EI_ INBOUND_ Main BP master data creation example (Demo customer only)