当前位置:网站首页>(7)FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
2022-08-08 12:13:00 【NBI大数据】
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE WaterSensor (" +
"id STRING," +
"ts BIGINT," +
"vc BIGINT," +
// "`pt` TIMESTAMP(3),"+
// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
"pt as PROCTIME() " +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'kafka_data_waterSensor'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'test'," +
"'scan.startup.mode' = 'earliest-offset'," +
// "'json.fail-on-missing-field' = 'false'," +
// "'json.ignore-parse-errors' = 'true'," +
"'format' = 'json'" +
")"
);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE WaterSensor , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
// //方式一:写入数据库
//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
env.execute();
}
边栏推荐
猜你喜欢
面试突击72:输入URL之后会执行什么流程?
模式识别 学习笔记:第七章 特征选择
内网渗透学习(五)域横向移动——PTH&PTK&PTT
The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
Jingdong, zhang, director of the cloud wireless products division treasure jingdong cloud wireless treasure close relationship with the open source | the great god, open source BUFF gain strategy revi
Mobile adaptation method of vw/vh - vw/vh instance - analog B stand mobile home page - get style tutorial video
【SSR服务端渲染+CSR客户端渲染+post请求+get请求+总结】三种开启服务器的方法总结
开放原子开源峰会 - SmartIDE正式开源并发布v1.0版本
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
逐步手撕轮播图3(分步教程)
随机推荐
SSL证书最长有效期13个月,还有必要一次申请多年吗?
Software testing testing on behalf of the user
结合“xPlus”探讨软件架构的创新与变革
MeterSphere--开源持续测试平台
力扣(LeetCode)219. 存在重复元素 II(2022.08.07)
Combining "xPlus" to discuss the innovation and change of software architecture
【cookie 临时存储数据,WebStorage ,sessionStorage】
一起学习集合框架之 TreeSet
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
JSON的Unicode问题;自定义排序问题;保留最大子集问题
十年架构五年生活-08 第一次背锅
简短截说阐述redis中事务的使用
ssh 安全 之 密钥登录
TF-GNN踩坑记录(一)
Redis的那些事:一文入门Redis的基础操作
LeetCode_14_最长公共前缀
别再到处乱放配置文件了!试试我司使用 7 年的这套解决方案,稳的一秕
Mobile adaptation method of vw/vh - vw/vh instance - analog B stand mobile home page - get style tutorial video
动图图解!既然IP层会分片,为什么TCP层也还要分段?
【C语言】[编程题]倒置字符串