当前位置:网站首页>(7)FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
2022-08-08 12:13:00 【NBI大数据】
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE WaterSensor (" +
"id STRING," +
"ts BIGINT," +
"vc BIGINT," +
// "`pt` TIMESTAMP(3),"+
// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
"pt as PROCTIME() " +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'kafka_data_waterSensor'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'test'," +
"'scan.startup.mode' = 'earliest-offset'," +
// "'json.fail-on-missing-field' = 'false'," +
// "'json.ignore-parse-errors' = 'true'," +
"'format' = 'json'" +
")"
);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE WaterSensor , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
// //方式一:写入数据库
//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
env.execute();
}
边栏推荐
- .NET Community Toolkit 8.0.0 版本发布
- phpstyle安装管理mysql
- 如何上线TB级推荐模型
- 关于微信小程序体验版获取不到openId的问题
- 京东云无线宝产品部负责人张晓东 : 京东云无线宝与开源的亲密关系 | 《大神详解开源 BUFF 增益攻略》讲座回顾...
- Mobile adaptation method of vw/vh - vw/vh instance - analog B stand mobile home page - get style tutorial video
- 请问如何实现两个不同环境的MySQL数据库实时同步
- 一些常见的web小功能
- ets声明式ui开发,怎么获取当前系统时间
- #yyds干货盘点#【愚公系列】2022年08月 Go教学课程 005-变量
猜你喜欢
随机推荐
phpstyle安装管理mysql
轻量级接口自动化框架(jmeter+ant+jenkins)
leetcode:761. 特殊的二进制序列【递归 + 转换有效括号】
Replication监控及自动故障切换
动图图解!既然IP层会分片,为什么TCP层也还要分段?
Software testing testing on behalf of the user
老手也常误用!详解 Go channel 内存泄漏问题
如何使用shell来进行版本管理-以iptables为例
Alibaba微服务组件Nacos注册中心
模式识别 学习笔记:第八章 特征提取
leetcode 1584. 连接所有点的最小费用
【地平线旭日X3派试用体验】WIFI连接,SSH登录,TogetherROS安装(第二节)
【SSR服务端渲染+CSR客户端渲染+post请求+get请求+总结】三种开启服务器的方法总结
MeterSphere - open source test platform
Pattern Recognition Study Notes: Chapter 6 Other Classification Methods (Continuously updated...)
LeetCode_66_加一
结合“xPlus”探讨软件架构的创新与变革
分享面试阿里、京东、网易等大厂后的面经及面试心得,让你秋招不再害怕
5S软件就是将软件应用全维度简单化的软件系统
这个选项是不是当数据库主键或唯一键发生冲突时替换数据