当前位置:网站首页>MapReduce案例-关于流量统计的求和分区规约排序操作
MapReduce案例-关于流量统计的求和分区规约排序操作
2022-04-22 13:11:00 【@李思成】
需求: 统计求和的求和分区规约
统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和,并实现的分区及规约。
分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入。
原始数据:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 游戏娱乐 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 jd.com 京东购物 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 taobao.com 淘宝购物 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 cnblogs.com 技术门户 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 19984138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 未知 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sougou.com 综合门户 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 baidu.com 综合搜索 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 zhilian.com 招聘门户 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 csdn.net 技术门户 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 图片大全 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 http://youku.com/ 视频网站 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 img.qfc.cn 图片浏览 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com 综合门户 18 138 1080 186852 200
1363157985059 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com 综合门户 19 128 1177 16852 200
Step 1: 自定义map的输出value对象FlowBean
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private Integer upFlow; // 上行数据包数
private Integer downFlow; //下行数据包数
private Integer upCountFlow; //上行流量总和
private Integer downCountFlow; //下行流量总和
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" +downCountFlow;
}
//序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upCountFlow);
dataOutput.writeInt(downCountFlow);
}
//反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.upCountFlow = dataInput.readInt();
this.downCountFlow = dataInput.readInt();
}
}
Step 2: 定义FlowMapper类
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//K1:行偏移量
//V1: 一行字符串,首先对字符串进行切分
String[] split = value.toString().split("\t");
String phone = split[1];
//创建FlowBean对象
FlowBean flowBean = new FlowBean();
//将字符串转换为数字
flowBean.setUpFlow(Integer.parseInt(split[6]));
flowBean.setDownFlow(Integer.parseInt(split[7]));
flowBean.setUpCountFlow(Integer.parseInt(split[8]));
flowBean.setDownCountFlow(Integer.parseInt(split[9]));
//将k2 v2 写入上下文中
//K2: 电话号
//V2:flowBean
context.write(new Text(phone),flowBean);
}
}
Step 3:自定义分区Partition
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartition extends Partitioner<Text,FlowBean> {
/* text : K2 手机号 flowBean: V2 i : ReduceTask的个数 */
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
//获得手机号
String phone = text.toString();
//判断手机号以什么开头,返回对应的分区编号
if(phone.startsWith("135")) return 0;
else if (phone.startsWith("136")) return 1;
else if (phone.startsWith("137")) return 2;
else if (phone.startsWith("138")) return 3;
else if (phone.startsWith("139")) return 4;
else return 5;
Step 4:自定义规约Combiner
概念: 每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce的一种优化手段之一。
-
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
-
combiner 组件的父类就是 Reducer
-
combiner 和 reducer 的区别在于运行的位置
-
Combiner 是在每一个 maptask 所在的节点运行
-
Reducer 是接收全局所有 Mapper 的输出结果
-
-
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
-
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟reducer 的输入key,value类型要对应起来。
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FLowCombiner extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
Integer upFlow = 0;
Integer downFlow = 0;
Integer upCountFlow = 0;
Integer downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
//创建FlowBean对象,并给对象赋值 V3
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setUpCountFlow(upCountFlow);
flowBean.setDownCountFlow(downCountFlow);
//将新的K2和V2写入上下文中
context.write(key,flowBean);
}
}
Step 5:自定义定义FlowReducer
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
Integer upFlow = 0;
Integer downFlow = 0;
Integer upCountFlow = 0;
Integer downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
//创建FlowBean对象,并给对象赋值 V3
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setUpCountFlow(upCountFlow);
flowBean.setDownCountFlow(downCountFlow);
//将K3和V3写入上下文中
//k3: 电话号
//V3: flpwBean
context.write(key,flowBean);
}
}
Step 6: 程序main函数入口
package org.example.mapreduce.Flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//创建一个Job任务对象
Job job = Job.getInstance(super.getConf(),"mapreduce_flowcount");
//如果打包运行出错,则需要加该配置
job.setJarByClass(FlowJobMain.class);
//配置job任务
//指定文件的读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///C:\\Myprogram\\IN\\FlowCount"));
//指定Map阶段的处理方式和数据类型
job.setMapperClass(FlowMapper.class);
//设置map阶段K2的数据类型
job.setMapOutputKeyClass(Text.class);
//V2的数据类型
job.setMapOutputValueClass(FlowBean.class);
// 第三(分区)
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(6); //分区数量
// 第四(排序)
// 第五(规约)
job.setCombinerClass(FLowCombiner.class);
// 第六(分组)
//第七 :指定reduce阶段的处理方式和数据类型
job.setReducerClass(FlowReducer.class);
// 设置K3的数据类型
job.setOutputKeyClass(Text.class);
// 设置V3的数据类型
job.setOutputValueClass(FlowBean.class);
//第八:设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出路径
TextOutputFormat.setOutputPath(job,new Path("file:///C:\\Myprogram\\OUT\\FlowCount_out0111"));
boolean bl = job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration,new FlowJobMain(),args);
System.exit(run);
}
}
完美撒花!
版权声明
本文为[@李思成]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_46560570/article/details/113391016
边栏推荐
- Use opencv's function threshold () to threshold the image based on Otsu - and attach a good blog link to introduce the principle of Otsu
- 算法---反转链表(Kotlin)
- Redis update
- Rsync remote synchronization
- 500 Internal Server Error错误补充
- Leetcode 617. Merge binary tree
- Digital commerce cloud centralized procurement system: centralized procurement and internal and external collaboration to minimize abnormal expenses
- 一句代码将OpenCV的Mat对象的数据打印输出
- NoSQL survey Part3: open source failure
- ROS2——手把手教你编写一个话题
猜你喜欢

Ros2 - teach you to write a topic hand in hand

Sprintf format string

Teach you what to beep

Alibaba cloud changes its commander and competes for Huawei's territory

The keys of redis have become backup and the values have disappeared. Why?

NoSQL survey Part3: open source failure

Oracle netsuite customers say | the "core secret script" for more detailed process control of China Film Barco

Drawing violin picture with R language geom_ Violin, how to add additional points geom_ point? geom_ violin + geom_ boxplot + geom_ Point combination

Ros2 - teach you how to write a service

Mysql database has been started successfully, but show is not an internal or external command. How to solve it?
随机推荐
R语言使用dhyper函数生成超几何分布密度数据、使用plot函数可视化超几何分布密度数据(Hypergeometric Distribution)
C custom button implementation source code
如何实现数组和 List 之间的转换?
RedisConfig配置类
Alibaba cloud changes its commander and competes for Huawei's territory
Explain in detail why the number of pixels with a gray value of 255 calculated by the opencv histogram calculation function calchist() is 0
XML外部实体攻击原理以及实战(XXE)(1)
奈飞大跌3500亿,爱优腾能靠涨价走出困境吗?
Excel表格中如何批量删除工作表
no main manifest attribute / .jar中没有主清单属性
我们需要什么样的数据库产品
如何成为开源数据库开发人员?
B站砍向直播,迟早的事
RT-Thread配置SPI-Flash(W25Q256)
Mysql database has been started successfully, but show is not an internal or external command. How to solve it?
Deep learning notes (I error)
小程序分享给好友列表以及分享到朋友圈
rsync远程同步
R语言使用scale函数标准化缩放dataframe数据(设置scale参数、scale参数设置除以标准差)
R语言绘制小提琴图geom_violin,如何添加额外的点geom_point?geom_violin + geom_boxplot + geom_point组合使用