当前位置:网站首页>MapReduce case - summation partition protocol sorting operation on traffic statistics
MapReduce case - summation partition protocol sorting operation on traffic statistics
2022-04-22 13:15:00 【@Li Sicheng】
demand : Summation partition protocol of statistical summation
Count the total uplink packets of each mobile phone number , Total downlink packets , Sum of total uplink traffic , Sum of total downlink traffic , And realize the partition and protocol .
analysis : Take the mobile phone number as key value , Uplink traffic , Downstream traffic , Total uplink traffic , The four fields of total downlink traffic are used as value value , And then with this key, and value As map Output of stage ,reduce Stage input .
Raw data :
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com Entertainment 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 jd.com Jingdong shopping 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 taobao.com Taobao shopping 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 cnblogs.com Technology portal 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com Video website 15 12 1527 2106 200
1363157995074 19984138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 Unknown 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sougou.com Integrated portal 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn Information security 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 baidu.com Comprehensive search 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com Site statistics 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com Search engine 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com Site statistics 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 zhilian.com Recruitment portal 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 csdn.net Technology portal 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com Integrated portal 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn Picture collection 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com Integrated portal 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com Search engine 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com Search engine 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 http://youku.com/ Video website 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 img.qfc.cn Picture view 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com Integrated portal 18 138 1080 186852 200
1363157985059 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com Integrated portal 19 128 1177 16852 200
Step 1: Customize map Output value object FlowBean
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private Integer upFlow; // Number of uplink packets
private Integer downFlow; // Number of downlink packets
private Integer upCountFlow; // Sum of uplink traffic
private Integer downCountFlow; // Total downlink traffic
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" +downCountFlow;
}
// serialize
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upCountFlow);
dataOutput.writeInt(downCountFlow);
}
// Deserialization
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.upCountFlow = dataInput.readInt();
this.downCountFlow = dataInput.readInt();
}
}
Step 2: Definition FlowMapper class
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//K1: Row offset
//V1: One line string , First, the string is segmented
String[] split = value.toString().split("\t");
String phone = split[1];
// establish FlowBean object
FlowBean flowBean = new FlowBean();
// Convert string to number
flowBean.setUpFlow(Integer.parseInt(split[6]));
flowBean.setDownFlow(Integer.parseInt(split[7]));
flowBean.setUpCountFlow(Integer.parseInt(split[8]));
flowBean.setDownCountFlow(Integer.parseInt(split[9]));
// take k2 v2 Write in context
//K2: Phone number
//V2:flowBean
context.write(new Text(phone),flowBean);
}
}
Step 3: Custom partition Partition
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartition extends Partitioner<Text,FlowBean> {
/* text : K2 cell-phone number flowBean: V2 i : ReduceTask The number of */
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
// Get phone number
String phone = text.toString();
// Determine what the phone number starts with , Return the corresponding partition number
if(phone.startsWith("135")) return 0;
else if (phone.startsWith("136")) return 1;
else if (phone.startsWith("137")) return 2;
else if (phone.startsWith("138")) return 3;
else if (phone.startsWith("139")) return 4;
else return 5;
Step 4: Custom protocol Combiner
Concept : every last map Can produce a lot of local output ,Combiner The function of is to map The output of the terminal is merged first , In order to reduce the map and reduce Data transfer between nodes , To improve the network IO performance , yes MapReduce It's one of the ways to optimize .
-
combiner yes MR In the program Mapper and Reducer A component other than
-
combiner The parent class of a component is Reducer
-
combiner and reducer The difference is the location of the operation
-
Combiner It's in every maptask The node is running
-
Reducer Receive global ownership Mapper Output result of
-
-
combiner The meaning of this is for every maptask Local summary of the output of , To reduce network traffic
-
combiner The premise of application is not to affect the final business logic , and ,combiner Output kv It should be with reducer The input of key,value Types should be matched .
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FLowCombiner extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
Integer upFlow = 0;
Integer downFlow = 0;
Integer upCountFlow = 0;
Integer downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
// establish FlowBean object , And assign a value to the object V3
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setUpCountFlow(upCountFlow);
flowBean.setDownCountFlow(downCountFlow);
// New K2 and V2 Write in context
context.write(key,flowBean);
}
}
Step 5: Custom definition FlowReducer
package org.example.mapreduce.Flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
Integer upFlow = 0;
Integer downFlow = 0;
Integer upCountFlow = 0;
Integer downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
// establish FlowBean object , And assign a value to the object V3
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setUpCountFlow(upCountFlow);
flowBean.setDownCountFlow(downCountFlow);
// take K3 and V3 Write in context
//k3: Phone number
//V3: flpwBean
context.write(key,flowBean);
}
}
Step 6: Program main Function entrance
package org.example.mapreduce.Flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
// Create a Job Task object
Job job = Job.getInstance(super.getConf(),"mapreduce_flowcount");
// If the package runs incorrectly , You need to add this configuration
job.setJarByClass(FlowJobMain.class);
// To configure job Mission
// Specify the reading method and path of the file
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///C:\\Myprogram\\IN\\FlowCount"));
// Appoint Map Stage processing and data type
job.setMapperClass(FlowMapper.class);
// Set up map Stage K2 Data type of
job.setMapOutputKeyClass(Text.class);
//V2 Data type of
job.setMapOutputValueClass(FlowBean.class);
// Third ( Partition )
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(6); // Zoning quantity
// Fourth ( Sort )
// The fifth ( Statute )
job.setCombinerClass(FLowCombiner.class);
// The sixth ( grouping )
// The seventh : Appoint reduce Stage processing and data type
job.setReducerClass(FlowReducer.class);
// Set up K3 Data type of
job.setOutputKeyClass(Text.class);
// Set up V3 Data type of
job.setOutputValueClass(FlowBean.class);
// The eighth : Set output type
job.setOutputFormatClass(TextOutputFormat.class);
// Set output path
TextOutputFormat.setOutputPath(job,new Path("file:///C:\\Myprogram\\OUT\\FlowCount_out0111"));
boolean bl = job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration,new FlowJobMain(),args);
System.exit(run);
}
}
Perfect flowers !
版权声明
本文为[@Li Sicheng]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204221311017867.html
边栏推荐
- Stm32cubemx redirects printf output to serial port
- Redis update
- The sales volume is cut by half, and there is no blueprint for lantu
- redis配置数据库数量
- 分省创新能力面板数据 - 含专利数、成交额等多指标数据(2008-2019年)
- Heterogeneity and stability data of senior management team (including code, calculation process and original data)
- POJ 3259 最短路SPFA + 负环 (模板)
- How to use colormaps and customize your favorite colorbar?
- mysql FUNCTION xxx.charindex does not exist
- MapReduce案例-上行流量倒序排序(递减排序)
猜你喜欢

Sprintf format string

稻盛和夫:直面现实、拼命思考、正面迎击

Ros2 - use of parameters

Corners of enterprise mailbox

奈飞大跌3500亿,爱优腾能靠涨价走出困境吗?

ROS机器人学习——麦克纳姆轮运动学解算

SPI协议的通信原理

Wong Kwong Yu Kwai tiktok is unable to sleep soundly.

Digital commerce cloud centralized procurement system: centralized procurement and internal and external collaboration to minimize abnormal expenses

B站砍向直播,迟早的事
随机推荐
Heterogeneity and stability data of senior management team (including code, calculation process and original data)
Chrome多设备书签同步方案
【黑马早报】知乎今日在港上市;小红书回应裁员20%;王者荣耀被指控抄袭;刘畊宏直播收入10天涨10倍;“知网反垄断第一案”已立案...
树莓派压缩备份
Operating revenue data set of listed companies (the third quarter of 1990-2021)
MapReduce案例—分别通过Reduce端和Map端实现JOIN操作
VMware虚拟机克隆后NAT模式下网络的配置
Sprintf format string
Rsync remote synchronization
How to become an open source database developer?
Advertising offensive and defensive war of Internet giants
分块——优雅的暴力
Download and installation of redis
Array and string offset access syntax with curly braces is deprecated
PM4PY - BPMN support
The difference between let, const and VaR
上市公司营业收入数据集(1990-2021第三季度)
Redis的下载安装
CMD command and script learning notes_ Kali_ Station B
Walk in the clouds - travel to the edge of the universe