当前位置:网站首页>(7)FlinkSQL将kafka数据写入到mysql方式二
(7)FlinkSQL将kafka数据写入到mysql方式二
2022-08-08 12:13:00 【NBI大数据】
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE WaterSensor (" +
"id STRING," +
"ts BIGINT," +
"vc BIGINT," +
// "`pt` TIMESTAMP(3),"+
// "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
"pt as PROCTIME() " +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'kafka_data_waterSensor'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'test'," +
"'scan.startup.mode' = 'earliest-offset'," +
// "'json.fail-on-missing-field' = 'false'," +
// "'json.ignore-parse-errors' = 'true'," +
"'format' = 'json'" +
")"
);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE WaterSensor , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
// //方式一:写入数据库
//// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
env.execute();
}边栏推荐
- 牛 plus,多层嵌套动态 JSON 该如何解析总结
- 京东云无线宝产品部负责人张晓东 : 京东云无线宝与开源的亲密关系 | 《大神详解开源 BUFF 增益攻略》讲座回顾...
- d切片示例
- leetcode:761. 特殊的二进制序列【递归 + 转换有效括号】
- 结点的查找
- [Horizon Rising Sun X3 Trial Experience] WIFI connection, SSH login, TogetherROS installation (section 2)
- Redis的那些事:一文入门Redis的基础操作
- 是不是只有字符串的数字水印能一直保留并且不影响计算,其他类型的字段导入数据库之后水印就会丢失?
- nvm的使用 nodejs版本管理,解决用户名是汉字的问题
- 一文读懂配置管理(CM)
猜你喜欢
随机推荐
odps sql被删除了,能找回来吗
JSON的Unicode问题;自定义排序问题;保留最大子集问题
逐步手撕轮播图3(分步教程)
一起学习集合框架之 TreeSet
Mysql索引优化实战
一文读懂配置管理(CM)
.NET Community Toolkit 8.0.0 版本发布
一些常见的web小功能
day01 -Web API介绍—DOM 介绍—获取元素—事件基础—操作元素—排他操作—自定义属性操作—节点操作—案例:动态生成表格—创建元素的三种方式(经典面试题)
【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
写个 shell 玩 数字炸弹
面试突击72:输入URL之后会执行什么流程?
哪来的TB级推荐模型
微服务负载均衡器LoadBalancer实战
DDoS攻击为什么是无解的
C语言详解系列——指针与结构体
day02 -DOM - advanced events (register events, event listeners, delete events, DOM event flow, event objects, prevent default behavior, prevent event bubbling, event delegation) - commonly used mouse
MYSQL 的 MASTER到MASTER的主主循环同步
(原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
LeetCode_66_加一









