当前位置:网站首页>(5) FlinkSQL writes socket data to mysql Method 2
(5) FlinkSQL writes socket data to mysql Method 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table mysql_user = tableEnv.from("flinksink");mysql_user.printSchema();Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");//Method 1: Write to the database// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //Append modeenv.execute();} 边栏推荐
- 史上最全JVM性能调优:线程+子系统+类加载+内存分配+垃圾回收
- 一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
- 硬盘数据恢复工具
- 深析C语言的灵魂 -- 指针
- 宝塔实测-TinkPHP5.1框架小程序商城源码
- 【C语言】自定义类型详解:结构体、枚举、联合
- 海外邮件发送指南(一)
- 报错 | Cannot find module ‘@better-scroll/core/dist/types/BScroll‘
- 尝试开发微信公众号消息推送功能并且和小程序关联
- STM32 entry development to make infrared remote control (smart home-universal remote control)
猜你喜欢

TF-GNN踩坑记录(一)

C语言小项目 -- 扫雷游戏完整代码(递归展开 + 选择标记)

Five-faced Alibaba rated P6 after taking the offer: share his interview experience

2022-08-03

别再到处乱放配置文件了!试试我司使用 7 年的这套解决方案,稳的一秕

Docker - persistent database (data volume)

安装MinGW-w64

C语言的三个经典题目:三步翻转法、杨氏矩阵、辗转相除法

化工行业数字化供应链系统:赋能化工企业高质量发展,促进上下游协同

字符串函数、字符函数、内存函数的使用及其模拟实现
随机推荐
作为一个年薪50W阿里P7架构师的必备知识:并发+JVM+多线程+Netty+MySQL
Fluorite, millet against smart camera
家电行业趋势:2022从三方面把握家电产品升级方向
迁移学习(Transfer Learning)的背景、历史及学习课
五面阿里巴巴拿offer后定级P6:分享自己的面试经历
(4)FlinkSQL将socket数据写入到mysql方式一
MySQL----索引
DDoS攻击为什么是无解的
关于微信小程序体验版获取不到openId的问题
爱可可AI前沿推介(8.8)
什么是IP SSL证书,如何申请?
金融行业数智化供应链管理系统:多维度评估分析供应商,赋能智能金融变革
开放原子开源峰会 - SmartIDE正式开源并发布v1.0版本
服务器配置——Linux系统安装Redis
使用shardingjdbc实现读写分离配置
Jenkins - install (2)
Acwing3452. 进制转换
C语言小项目 -- 扫雷游戏完整代码(递归展开 + 选择标记)
医药行业转型发展,探索数字化供应链升级之道
Jenkins - Introduction to Continuous Integration (1)