当前位置:网站首页>(5) FlinkSQL writes socket data to mysql Method 2
(5) FlinkSQL writes socket data to mysql Method 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table mysql_user = tableEnv.from("flinksink");mysql_user.printSchema();Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");//Method 1: Write to the database// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //Append modeenv.execute();} 边栏推荐
- 报错 | Cannot find module ‘@better-scroll/core/dist/types/BScroll‘
- MySQL database storage series (5) the InnoDB storage format
- 安装MinGW-w64
- 哪来的TB级推荐模型
- 8/7 牛客6+div2D+倍增lca
- JPA之使用复合主键
- Five-faced Alibaba rated P6 after taking the offer: share his interview experience
- 一文读懂配置管理(CM)
- 处理器的调试接口
- 论文阅读《Omnidirectional DSO: Direct Sparse Odometry with Fisheye Cameras》
猜你喜欢
随机推荐
The maximum validity period of an SSL certificate is 13 months. Is it necessary to apply for multiple years at a time?
Jenkins - Introduction to Continuous Integration (1)
SSTI漏洞介绍认识(flask、Werkzeup)
建材业深陷数字化困局,B2B协同系统标准化交易流程,解决企业交易网络化难题
这个选项是不是当数据库主键或唯一键发生冲突时替换数据
老手也常误用!详解 Go channel 内存泄漏问题
Docker - persistent database (data volume)
【C语言】深度剖析数据在内存中的存储
五心红娘6月成功案列
【C语言】文件相关操作
金融行业数智化供应链管理系统:多维度评估分析供应商,赋能智能金融变革
ctfshow 七夕杯(复现)
Jenkins - 持续集成介绍(1)
C语言小项目 -- 扫雷游戏完整代码(递归展开 + 选择标记)
changes not staged for commit 解决办法
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
关于微信小程序体验版获取不到openId的问题
面试突击72:输入URL之后会执行什么流程?
win10安装Solidworks2016安装出错:solidworks\sldfuncfeat.dll“ 已返回 0x3,如何解决.
Jenkins - install (2)
![(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化](/img/97/c19480b21bc183563fc985a2429bda.jpg)

![[C language] file related operations](/img/bb/95a5acc1c8e780c1ed46c9c8ab0543.png)






