当前位置:网站首页>(7) FlinkSQL kafka data written to the mysql way 2
(7) FlinkSQL kafka data written to the mysql way 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE WaterSensor (" +"id STRING," +"ts BIGINT," +"vc BIGINT," +// "`pt` TIMESTAMP(3),"+// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +"pt as PROCTIME() " +") WITH (" +"'connector' = 'kafka'," +"'topic' = 'kafka_data_waterSensor'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'test'," +"'scan.startup.mode' = 'earliest-offset'," +// "'json.fail-on-missing-field' = 'false'," +// "'json.ignore-parse-errors' = 'true'," +"'format' = 'json'" +")");tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE WaterSensor , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");// //Method 1: write to the database//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");////Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();env.execute();}
边栏推荐
- 案例分析 | 宜家以双钻设计模型探索线上零售新业务
- 消防安全知识培训讲座
- 简析LDO静态电流与功耗的关系
- MeterSphere - open source test platform
- DDoS攻击为什么是无解的
- What is the IP SSL certificate, how to apply for?
- win10安装Solidworks2016安装出错:solidworks\sldfuncfeat.dll“ 已返回 0x3,如何解决.
- Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world
- The maximum validity period of an SSL certificate is 13 months. Is it necessary to apply for multiple years at a time?
- (原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
猜你喜欢
指针和数组笔试题解析
Fluorite, millet against smart camera
开放原子开源峰会 - SmartIDE正式开源并发布v1.0版本
Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan
C语言小项目 -- 扫雷游戏完整代码(递归展开 + 选择标记)
【SSR服务端渲染+CSR客户端渲染+post请求+get请求+总结】三种开启服务器的方法总结
JSON的Unicode问题;自定义排序问题;保留最大子集问题
IJCAI 2022 | 基于随机游走聚合的图神经网络
【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
报错 | RegExp2 is not defined
随机推荐
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
STM32的内存管理相关(内存架构,内存管理,map文件分析)
萤石、小米对垒智能摄像头
dedecms支持Word图文自动粘贴
MySQL----索引
论文阅读《Omnidirectional DSO: Direct Sparse Odometry with Fisheye Cameras》
Docker - persistent database (data volume)
大缓存更强劲,搭载AMD Milan-X的浪潮GPU服务器NF5468A5深度评测
changes not staged for commit 解决办法
Supervisor 后台进程管理
Jingdong, zhang, director of the cloud wireless products division treasure jingdong cloud wireless treasure close relationship with the open source | the great god, open source BUFF gain strategy revi
MySQL安装及使用
Doris学习笔记之优化
如何在go重打印函数调用者信息Caller
node中package解析、npm 命令行npm详解,node中的common模块化,npm、nrm两种方式查看源和切换镜像
宝塔实测-TinkPHP5.1框架小程序商城源码
day02 -DOM - advanced events (register events, event listeners, delete events, DOM event flow, event objects, prevent default behavior, prevent event bubbling, event delegation) - commonly used mouse
“华数杯”建模学习(Matlab)
Collection of shell basics
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持