当前位置:网站首页>MapReduce核心和基础Demo
MapReduce核心和基础Demo
2022-04-23 09:50:00 【zhaojiew】
MapReduce核心和基础例程
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上
优点:易于编程、容错性,扩展性、海量数据的离线处理
缺点:不擅长实时计算(在毫秒或者秒级内返回结果) ,不擅长流式计算(输入数据集是静态的 ),不擅长 DAG(有向无环图)计算 (多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出 。每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下 )
核心思想
- 分布式的运算程序往往需要分成至少 2 个阶段。
- 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
- 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
- MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行(但是效率较低)。
MapReduce 进程
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责 Map 阶段的整个数据处理流程。
- ReduceTask:负责 Reduce 阶段的整个数据处理流程
数据序列化类型对照
Java 类型 | Hadoop Writable 类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWriatable |
wordcount例程和编程规范
总体思路
maven依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
编写的MR程序分成三个部分: Mapper、 Reducer 和 Driver
mapper
map()方法(MapTask进程)对每一个<K,V>调用一次
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outK.set(word);
context.write(outK,outV);
}
}
}
reducer
ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
int sum;
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//创建配置获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//关联Driver程序
job.setJarByClass(WordCountDriver.class);
//关联 mapper和reducer程序
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//设置mapper输出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//本地路径
FileInputFormat.setInputPaths(job, new Path("src/main/resources/input"));
FileOutputFormat.setOutputPath(job, new Path("src/main/resources/output"));
//打包jar集群路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
本地测试直接运行即可
提交集群测试,先打包jar,在hadoop集群shell执行
hadoop jar wc.jar com.zhaojie.wordcount.WordCountDriver /pythonzen.txt /output
序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)
特点:紧凑 : 高效使用存储空间;快速: 读写数据的额外开销小;互操作: 支持多语言的交互
步骤:
- 创建类实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序和序列化的顺序完全一致
- 要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
- 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口(可以直接实现WritableComparable接口,改接口继承了Writable抽象类),因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
mapper和reducer
从泛型可以看出,此时mapper和reducer输出和接收的类不再是简单数据类型,而是实现了Writable接口的对象,MR框架会自动进行序列化和反序列化处理
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
//0 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String phone = items[1];
String upflow = items[items.length - 3];
String downflow = items[items.length - 2];
outV.setUpFlow(Long.parseLong(upflow));
outV.setDownFlow(Long.parseLong(downflow));
outV.setSumFlow();
outK.set(phone);
context.write(outK, outV);
}
}
public class FlowReduce extends Reducer<Text, FlowBean,Text, FlowBean> {
long upflowsum;
long downflowsum;
FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
upflowsum = 0;
downflowsum = 0;
for (FlowBean value : values) {
upflowsum += value.getUpFlow();
downflowsum += value.getDownFlow();
value.setSumFlow();
}
outV.setUpFlow(upflowsum);
outV.setDownFlow(downflowsum);
outV.setSumFlow();
context.write(key,outV);
}
}
版权声明
本文为[zhaojiew]所创,转载请带上原文链接,感谢
https://blog.csdn.net/sinat_41567654/article/details/124278203
边栏推荐
- Go language practice mode - functional options pattern
- Flutter 的加载动画这么玩更有趣
- 构建元宇宙时代敏捷制造的九种能力
- 个人主页软件Fenrus
- SAP pi / PO function operation status monitoring and inspection
- Golang force buckle leetcode 396 Rotation function
- Canary publishing using ingress
- Flink 流批一体在小米的实践
- 论文阅读《Integrity Monitoring Techniques for Vision Navigation Systems》
- Code source daily question div1 (701-707)
猜你喜欢
亚马逊云科技入门资源中心,从0到1轻松上云
个人主页软件Fenrus
SAP excel has completed file level validation and repair. Some parts of this workbook may have been repaired or discarded.
Number theory blocking (integer division blocking)
Leetcode0587. Install fence
论文阅读《Integrity Monitoring Techniques for Vision Navigation Systems》——5结果
Yarn核心参数配置
实践六 Windows操作系统安全攻防
Two methods of building Yum source warehouse locally
Leetcode题库78. 子集(递归 c实现)
随机推荐
Cloud identity is too loose, opening the door for attackers
Go language practice mode - functional options pattern
[hdu6833] a very easy math problem
MySQL of database -- overview and installation
Creation of raid0 and RAID5 and Simulation of how RAID5 works
Leetcode0587. Install fence
Chinese Remainder Theorem and extended Chinese remainder theorem that can be understood by Aunt Baojie
Using sqlmap injection to obtain the account and password of the website administrator
golang力扣leetcode 396.旋转函数
Kernel PWN learning (4) -- double fetch & 0ctf2018 baby
Yarn核心参数配置
Nine abilities of agile manufacturing in the era of meta universe
Personal homepage software fenrus
JS DOM event
Where is int a = 1 stored
Cloud computing competition -- basic part of 2020 competition [task 3]
1D / 1D dynamic programming learning summary
How to obtain geographical location based on photos and how to prevent photos from leaking geographical location
个人主页软件Fenrus
Amazon cloud technology entry Resource Center, easy access to the cloud from 0 to 1