当前位置:网站首页>(5)FlinkSQL将socket数据写入到mysql方式二
(5)FlinkSQL将socket数据写入到mysql方式二
2022-08-08 12:13:00 【NBI大数据】
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
// 将流转化为表
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table mysql_user = tableEnv.from("flinksink");
mysql_user.printSchema();
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
//方式一:写入数据库
// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //追加模式
env.execute();
}边栏推荐
- 皕杰报表之数据校验与处理
- 模式识别 学习笔记:第七章 特征选择
- DDoS攻击为什么是无解的
- (原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
- 【SSR服务端渲染+CSR客户端渲染+post请求+get请求+总结】三种开启服务器的方法总结
- STM32 entry development to make infrared remote control (smart home-universal remote control)
- Replication监控及自动故障切换
- 安科瑞预付费水电集团物业解决方案-Susie 周
- 【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
- 在半小时内从无到有开发并调试一款Chrome扩展(Chrome插件/谷歌浏览器插件)
猜你喜欢

一条SQL在MySQL中是如何执行的

报错 | RegExp2 is not defined

day01 -Web API介绍—DOM 介绍—获取元素—事件基础—操作元素—排他操作—自定义属性操作—节点操作—案例:动态生成表格—创建元素的三种方式(经典面试题)

一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持

Mysql索引优化实战

Study: Toxic PFAS chemicals make rainwater unsafe to drink around the world

office安装出现了“office对安装源的访问被拒绝30068-4(5)”错误

探究!一个数据包在网络中的心路历程

day02 -DOM—高级事件(注册事件、事件监听、删除事件、DOM事件流、事件对象、阻止默认行为、阻止事件冒泡、事件委托)—常用鼠标事件—常用的键盘事件

【访谈】Eotalk Vol.01:Eoapi,我们希望以开源的方式构建 API 生态系统
随机推荐
[Horizon Rising Sun X3 Trial Experience] WIFI connection, SSH login, TogetherROS installation (section 2)
【地平线旭日X3派试用体验】WIFI连接,SSH登录,TogetherROS安装(第二节)
GC explanation and tuning of JVM
海外邮件发送指南(一)
【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
STM32的内存管理相关(内存架构,内存管理,map文件分析)
模式识别 学习笔记:第七章 特征选择
【C语言】[编程题]倒置字符串
phpstyle安装管理mysql
宝塔实测-TinkPHP5.1框架小程序商城源码
看到这个应用上下线方式,不禁感叹:优雅,太优雅了!
Docker-持久化数据库(数据卷)
openssl 创建证书
神经网络分类
MySQL Dual-Master 双向同步
leetcode 1584. 连接所有点的最小费用
Jenkins-安装(2)
Yizhou Financial Analysis | Internet-based small loan platform intensively increased capital; comprehensive evaluation index of bank wealth management subsidiaries released in the first half of the ye
ets声明式ui开发,怎么获取当前系统时间
Promise 解决阻塞式同步,将异步变为同步