当前位置:网站首页>Flink流处理引擎系统学习(三)
Flink流处理引擎系统学习(三)
2022-04-22 08:22:00 【肥仔哥哥1930】
前言
再来一个Flink的stream的example,提前先说下,官网的例子有点坑。
一、stream例子

拷贝到我的目录(这里顺便说下一个好的工具用起来真香,idea居然可以复制的代码,粘贴自动跟我创建类)

你想文什么,我知道,先别问,继续看下面。
二、example整理
1.依赖引入
你拷贝到你的demo项目,在自动引包的时候,会发现很多缺很多对象。
首先需要引入flink-connector-files
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${
flink.version}</version>
</dependency>
2.为什么会有TextLineInputFormat类
做完上面的步骤,应该会发现缺少这个类。首先说明,这里的示例居然是根据最新的快照版本写的,返回去看example的pom会发现。

我从maven下载到这个版本,用luyten打开jar,发现还真有这个类。

于是就新建了这样一个类,这里要说下,如果是用最新版本1.14.4版本,其实那行代码换个写法也是可以的。
FileSource.FileSourceBuilder<String> builder =
// FileSource.forRecordStreamFormat(
//new TextLineInputFormat(), params.getInputs().get()); 1.16-SNAPSHOT版本
FileSource.forRecordStreamFormat(new TextLineFormat()
, params.getInputs().get()); //1.14.4版本
这个最终解释权,还是来源官网,在官网1.14.4版本的Search框输入Text,就会出现Text Files的下拉了,点击,就会跳转到语法使用示例

3.还有报错提示

点击MemorySize的方法ofMebiBytes看实现,确定可以直接写long值,另外,下面Duration这里也爆红,改:
counts.sinkTo(
FileSink.<Tuple2<String, Integer>>forRowFormat(
params.getOutput().get(), new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(20)
.withRolloverInterval(10)
.build())
.build())
.name("file-sink");
三、运行example

报错,原因查了下没有说明,看这个字面意思是,无法使用私有最终字节。。。
PS:说到这突然想到,jdk在9就将实现由char[]改成了byte[]了,意义是可以节省占用内存。
这里其实我怀疑是不是jdk17有bug,最后突然想到官网建demo项目脚本的地方有这样的2句话:

于是,切换demo的build&run的jdk版本,idea里在这里切换:

再次运行

熟悉的分区统计又出来了,是不是很开心!!!
总结
1、看了官网的example还很多,后面的学习可能不能每个跟大家都分享了,有些我们肯定也暂时不用。
2、我的CSDN学习会员里Flink的这个课程有100节课,学习要加快,可能后面都有可能没有专门的时间学习了,只能在实战中get了。
希望能帮到大家哦,下班了,回家。
版权声明
本文为[肥仔哥哥1930]所创,转载请带上原文链接,感谢
https://zwsky.blog.csdn.net/article/details/124328772
边栏推荐
猜你喜欢

快速排序及优化

RHEL user and group management - Notes

什么产品都还没有 马斯克的“无聊公司”估值已高达57亿美元

Concept and understanding of memory address

Return type of getchar function

Android Development - SQLite and sqlitedatabase Application Experiment 6 notes

MyCms 自媒体 CMS 系统 v3.2.2,广告插件优化

CMake基础知识二之实例单个文件,多个文件,动静态库及使用
![C language to realize [shutdown program]](/img/b3/0364fda1bc27d754dd11eec055979d.jpg)
C language to realize [shutdown program]

那些年不会做的数学题,用Pyhon只需要1分钟?
随机推荐
用栈实现队列(双栈,输入栈、输出栈)
RHCSA第二天作业
机器学习之分类回归树
Fabric test example, encountered order exited (x) x seconds
如何清空输入缓冲区
机器学习之逻辑回归的步骤原理
相交链表(Set、双指针)
Vs compiler annotation style
A growing tree
Boolean type [bool]
Flume composition, put transaction, take transaction
Introduction to Nessus vulnerability scanning
学习RHCSA的第二天
Little known "three letter word"
How to empty the input buffer
工业缺陷检测项目实战(二)——基于深度学习框架yolov5的钢铁表面缺陷检测
N states of prime number solution
Leetcode0396. 旋转函数(medium,迭代)
Usage of static [detailed explanation]
@ data annotation in idea, get / set method does not work