当前位置:网站首页>(5)FlinkSQL将socket数据写入到mysql方式二
(5)FlinkSQL将socket数据写入到mysql方式二
2022-08-08 12:13:00 【NBI大数据】
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
// 将流转化为表
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table mysql_user = tableEnv.from("flinksink");
mysql_user.printSchema();
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
//方式一:写入数据库
// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //追加模式
env.execute();
}
边栏推荐
猜你喜欢
day02 -DOM - advanced events (register events, event listeners, delete events, DOM event flow, event objects, prevent default behavior, prevent event bubbling, event delegation) - commonly used mouse
Five-faced Alibaba rated P6 after taking the offer: share his interview experience
京东云无线宝产品部负责人张晓东 : 京东云无线宝与开源的亲密关系 | 《大神详解开源 BUFF 增益攻略》讲座回顾...
一条SQL在MySQL中是如何执行的
dedecms支持Word图文自动粘贴
TF-GNN踩坑记录(一)
动图图解!既然IP层会分片,为什么TCP层也还要分段?
Redis 定长队列的探索和实践
深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
轻量级接口自动化框架(jmeter+ant+jenkins)
随机推荐
安科瑞预付费水电集团物业解决方案-Susie 周
学习与尝试 --&gt; 事件风暴
SSL证书最长有效期13个月,还有必要一次申请多年吗?
JPA之使用复合主键
day01 - Introduction to Web API - Introduction to DOM - Getting Elements - Event Basics - Manipulating Elements - Exclusive Operations - Custom Attribute Operations - Node Operations - Cases: Dynamica
Five-faced Alibaba rated P6 after taking the offer: share his interview experience
深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
看到这个应用上下线方式,不禁感叹:优雅,太优雅了!
京东云无线宝产品部负责人张晓东 : 京东云无线宝与开源的亲密关系 | 《大神详解开源 BUFF 增益攻略》讲座回顾...
Docker-持久化数据库(数据卷)
如何上线TB级推荐模型
关于那些我们都听过的营销工具—优惠券
leetcode:761. 特殊的二进制序列【递归 + 转换有效括号】
萤石、小米对垒智能摄像头
Jenkins-安装(2)
皕杰报表之数据校验与处理
Pattern Recognition Study Notes: Chapter 6 Other Classification Methods (Continuously updated...)
爱可可AI前沿推介(8.8)
鲲鹏开发者创享日2022:鲲鹏全栈创新 与开发者共建数字湖南
Replication监控及自动故障切换