当前位置:网站首页>高级测试:如何使用Flink对Strom任务的逻辑功能进行复现测试?
高级测试:如何使用Flink对Strom任务的逻辑功能进行复现测试?
2022-08-10 06:03:00 【软件测试小仙女】
Flink和Strom都是时下较为流行的数据流平台,考虑以下一种应用场景:已经使用Strom完成了对于某一逻辑功能的开发,如果现在期望使用Flink实现相同的逻辑,那么就需要考虑如何使用Flink来对Strom任务的逻辑功能进行最简单的复现测试。

使用Flink来测试Strom任务的逻辑主要存在两个最基本的问题:第一,Storm通过自定义的Bolt类实现自定义的逻辑,在Flink中如何实现?第二,Storm按照自定义标准实现数据分发的逻辑,在Flink中如何实现?
本文主要通过两个最基本的Flink程序实例对上述两个使用Flink测试Strom任务逻辑存在的基本问题进行解答。
第一个问题,我们可以通过Flink的ProcessFuction类进行实现,通过继承该类,在该类的processElement方法中实现自定义逻辑。ProcessFuction类如下图所示,我们可以通过var1这个参数直接获取当前流中的数据,然后进行自定义的逻辑加工,再通过Collector类var3的collect方法将处理后的数据发送到下一个流中。

假设某一Strom任务的功能逻辑是:① 对初始数据源(一个字符串)末尾添加一个字符串。② 然后再次添加另一个字符串。
我们以上述对字符串加工的Strom任务为例,说明Flink程序如何通过ProcessFuction类对该任务实现复现测试。
(1)Flink主程序,假设初始数据源为“abc”。

(2)第一个业务加工类,给数据流末尾添加“def”。

(3)第二个业务加工类,给数据流末尾添加“ghi”。

(4)执行Flink程序,观察输出结果,“abc”被二次加工为“abcdefghi”。

第二个分发数据的问题,我们假设某一Strom任务的功能逻辑是对数据源(股票对象)进行分类,将股价高于X的分为一类,将股价小于等于X的分为另一类。
我们以上述对股票数据对象分类处理的Strom任务为例,说明Flink程序如何通过旁路输出特性实现对数据流按照自定义标准分类,输出到不同的子数据流中处理。

Flink 的旁路输出依然涉及ProcessFunction类的processElement方法,该方法的Context类型的var2参数的主要作用是利用其output方法进行旁路输出(我们用于进行数据分流)。
Flink的旁路输出特性可以用来对数据进行分流,通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。
每个流标签都有一个id,也可以不创建对象,只要流标签的id相同,其中的数据就相同。因此,可以通过匿名内部类的形式来获取子数据流。第一个参数是id,第二个参数是数据类型(不能省略)。
(1)创建股票类Stock,属性包括名称和价格。

(2)创建消费消息的Flink程序。

(3)创建生产消息的Flink程序。

我们用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”这两个ID作为两个旁路输出标签的ID。
在processElement方法中,我们通过判断股票的价格是否大于50区分出低价股和高价股,利用Context对象的output方法进行旁路输出,把price小于50的Stock对象输出到ID为“STOCK_LOW_PRICE”的低价股标签旁路中,而把price大于等于50的Stock对象输出到ID为“STOCK_HIGH_PRICE”的高价股标签旁路中。

(4)依次启动消费者程序、生产者程序,观察消费者程序控制台中的输出:

此时,桌面生成了两个文件夹,当中记录了股票数据,result1记录了小于50的低价股,result2中记录了股价大于等于50的高价股。


最后:
可以到我的个人号:atstudy-js,可以免费领取一份10G软件测试工程师面试宝典文档资料。以及相对应的视频学习教程免费分享!其中包括了有基础知识、Linux必备、Mysql数据库、抓包工具、接口测试工具、测试进阶-Python编程、Web自动化测试、APP自动化测试、接口自动化测试、测试高级持续集成、测试架构开发测试框架、性能测试等。
这些测试资料,对于做【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!
边栏推荐
- UnityShader入门精要-高级光照基础
- 求职
- Quickly grasp game resources in one hour and remote hot update
- I would like to ask you guys, when FLink SQL reads the source, specify the time field of the watermark. If the specified field is in the grid
- 手机与雷电模拟器里如何使用YiLu代理?
- Lunix(阿里云服务器)安装Anaconda并开启jupyter服务本地访问
- 裸辞—躺平—刷题—大厂(Android面试的几大技巧)
- Ladies and gentlemen, oracle11g, cdc2.2, flink1.13.6, single-table incremental synchronization.Without adding data
- OSPF的dr和bdr
- Grammar Basics (Judgment Statements)
猜你喜欢
随机推荐
QScroller的QScrollerProperties参数研究
mysql数据库定时备份(保留近7天的备份)
UnityShader入门精要-透明效果
请问一下。Oracle CDC 连接器支持 LogMiner 和 XStream API 两种方式捕
OSPF的dr和bdr
MySQL之InnoDB引擎(六)
2022 Henan Mengxin League Game (5): University of Information Engineering F - Split Turf
Make a boot floppy and boot with bochs emulator
netlink IPC
CuteOneP 一款php的OneDrive多网盘挂载程序 带会员 同步等功能
1413. 逐步求和得到正数的最小值
各位大佬,oracle11g,cdc2.2,flink1.13.6,单表增量同步。在没新增数据的情
手机与雷电模拟器里如何使用YiLu代理?
裸辞—躺平—刷题—大厂(Android面试的几大技巧)
ES13 - ES2022 - 第 123 届 ECMA 大会批准了 ECMAScript 2022 语言规范
UnityShader入门精要-高级光照基础
Please pay attention to me, thank you.
Hypervisor, KVM, QEMU总结
Grammar Basics (Judgment Statements)
XV6系统调用实现









