当前位置:网站首页>(5) FlinkSQL writes socket data to mysql Method 2
(5) FlinkSQL writes socket data to mysql Method 2
2022-08-08 13:03:00 【NBI big data】
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));}});// convert stream to tableTable table = tableEnv.fromDataStream(waterDS,$("id"),$("ts"),$("vc"),$("pt").proctime());tableEnv.createTemporaryView("EventTable", table);tableEnv.executeSql("CREATE TABLE flinksink (" +"componentname STRING," +"componentcount BIGINT NOT NULL," +"componentsum BIGINT" +") WITH (" +"'connector.type' = 'jdbc'," +"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +"'connector.table' = 'flinksink'," +"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +"'connector.username' = 'root'," +"'connector.password' = 'root'," +"'connector.write.flush.max-rows'='3'\r\n" +")");Table mysql_user = tableEnv.from("flinksink");mysql_user.printSchema();Table result = tableEnv.sqlQuery("SELECT " +"id as componentname, " + //window_start, window_end,"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +"FROM TABLE( " +"TUMBLE( TABLE EventTable , " +"DESCRIPTOR(pt), " +"INTERVAL '10' SECOND)) " +"GROUP BY id , window_start, window_end");//Method 1: Write to the database// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//Method 2: Write to the databasetableEnv.createTemporaryView("ResultTable", result);tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //Append modeenv.execute();}
边栏推荐
猜你喜欢
随机推荐
SQL实例 - 胜平负
C language small project - complete code of minesweeper game (recursive expansion + selection mark)
这个选项是不是当数据库主键或唯一键发生冲突时替换数据
leetcode 1584. 连接所有点的最小费用
一些常见的web小功能
Server Configuration - Install Redis on Linux System
IJCAI 2022 | 基于随机游走聚合的图神经网络
nvm的使用 nodejs版本管理,解决用户名是汉字的问题
Collection of shell basics
curl获取harbor镜像仓库项目下的镜像列表
案例分析 | 宜家以双钻设计模型探索线上零售新业务
简析LDO静态电流与功耗的关系
C语言小项目 -- 通讯录(静态版+动态版+文件版)
STM32 entry development to make infrared remote control (smart home-universal remote control)
Five-faced Alibaba rated P6 after taking the offer: share his interview experience
处理器的调试接口
程序环境和预处理
Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan
phpstyle安装管理mysql
别再到处乱放配置文件了!试试我司使用 7 年的这套解决方案,稳的一秕