当前位置:网站首页>(5) FlinkSQL writes socket data to mysql Method 2
(5) FlinkSQL writes socket data to mysql Method 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table mysql_user = tableEnv.from("flinksink");mysql_user.printSchema();Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");//Method 1: Write to the database// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //Append modeenv.execute();} 边栏推荐
- Jenkins-安装(2)
- C语言小项目 -- 扫雷游戏完整代码(递归展开 + 选择标记)
- JPA之使用复合主键
- JSON的Unicode问题;自定义排序问题;保留最大子集问题
- 深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
- [C language] Dynamic memory management
- 什么是IP SSL证书,如何申请?
- xxd命令(反编译、二进制文件转十六进制文件)
- 安科瑞预付费水电集团物业解决方案-Susie 周
- Five-faced Alibaba rated P6 after taking the offer: share his interview experience
猜你喜欢

Program Environment and Preprocessing

The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection

2022-08-04

Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world
![[C language] file related operations](/img/bb/95a5acc1c8e780c1ed46c9c8ab0543.png)
[C language] file related operations

nvm的使用 nodejs版本管理,解决用户名是汉字的问题

Replication监控及自动故障切换

Jenkins - install (2)

Software testing testing on behalf of the user

C语言的三个经典题目:三步翻转法、杨氏矩阵、辗转相除法
随机推荐
[C language] Dynamic memory management
(6) FlinkSQL writes kafka data to mysql Method 1
《show your work》 从现在开始!
JPA之使用复合主键
华中科大提出VGNetG:“不做选择,全都要”轻量化主干网络!
消防安全知识培训讲座
学习与尝试 --> 事件风暴
报错 | Cannot find module ‘@better-scroll/core/dist/types/BScroll‘
老手也常误用!详解 Go channel 内存泄漏问题
分享面试阿里、京东、网易等大厂后的面经及面试心得,让你秋招不再害怕
Software testing testing on behalf of the user
(6)FlinkSQL将kafka数据写入到mysql方式一
什么是IP SSL证书,如何申请?
建材业深陷数字化困局,B2B协同系统标准化交易流程,解决企业交易网络化难题
SSL证书最长有效期13个月,还有必要一次申请多年吗?
xxd命令(反编译、二进制文件转十六进制文件)
【重构map】【重构filter】【重构Some】【重构reduce方法】【重构flat函数】
第十二届蓝桥杯《杨辉三角》-二分法
八月粉丝福利来了!大疆手机云台你爱了吗?
Jenkins - install (2)