当前位置:网站首页>(5) FlinkSQL writes socket data to mysql Method 2
(5) FlinkSQL writes socket data to mysql Method 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table mysql_user = tableEnv.from("flinksink");mysql_user.printSchema();Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");//Method 1: Write to the database// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //Append modeenv.execute();}
边栏推荐
- nvm的使用 nodejs版本管理,解决用户名是汉字的问题
- Combining "xPlus" to discuss the innovation and change of software architecture
- Jenkins - install (2)
- The programmer essential VS debugging technique
- In-depth analysis of the soul of C language -- pointer
- Promise 解决阻塞式同步,将异步变为同步
- 十年架构五年生活-08 第一次背锅
- [Horizon Rising Sun X3 Trial Experience] WIFI connection, SSH login, TogetherROS installation (section 2)
- MYSQL 的 MASTER到MASTER的主主循环同步
- 无心剑2022年七绝100首
猜你喜欢
Five-faced Alibaba rated P6 after taking the offer: share his interview experience
Combining "xPlus" to discuss the innovation and change of software architecture
Jenkins - 持续集成介绍(1)
研究:有毒的PFAS化学品使全球各地的雨水无法安全饮用
医药行业转型发展,探索数字化供应链升级之道
分享面试阿里、京东、网易等大厂后的面经及面试心得,让你秋招不再害怕
【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
萤石、小米对垒智能摄像头
[Horizon Rising Sun X3 Trial Experience] WIFI connection, SSH login, TogetherROS installation (section 2)
ctfshow 七夕杯(复现)
随机推荐
DDoS攻击为什么是无解的
【SSR服务端渲染+CSR客户端渲染+post请求+get请求+总结】三种开启服务器的方法总结
odps sql被删除了,能找回来吗
详解轮播图二-通过left定位来轮播图片
金融行业数智化供应链管理系统:多维度评估分析供应商,赋能智能金融变革
关于微信小程序体验版获取不到openId的问题
8/7 牛客6+div2D+倍增lca
The use of qsort function and its analog implementation
2022-08-04
Combining "xPlus" to discuss the innovation and change of software architecture
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
nvm的使用 nodejs版本管理,解决用户名是汉字的问题
MeterSphere--开源持续测试平台
The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
changes not staged for commit solution
简短截说阐述redis中事务的使用
指针和数组笔试题解析
这个选项是不是当数据库主键或唯一键发生冲突时替换数据
医药行业转型发展,探索数字化供应链升级之道
Docker-持久化数据库(数据卷)