当前位置:网站首页>(1)通过FlinkSQL将数据写入mysql demo
(1)通过FlinkSQL将数据写入mysql demo
2022-08-08 14:54:00 【51CTO】
FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
(1)首先引入POM依赖:
< properties >
< flink. version > 1.13 .1 </ flink. version >
< scala. binary. version > 2.12 </ scala. binary. version >
< slf4j. version > 1.7 .30 </ slf4j. version >
</ properties >
< dependencies >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - java </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - streaming - java_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - clients_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - table - api - java - bridge_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
<!-- https: //mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - connector - jdbc_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version >
<!--< scope > provided </ scope >-->
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - table - planner - blink_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - streaming - scala_${ scala. binary. version} </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - table - common </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
< dependency >
< groupId > org. apache. flink </ groupId >
< artifactId > flink - json </ artifactId >
< version > ${ flink. version} </ version >
</ dependency >
<!-- https: //mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
< dependency >
< groupId > com. fasterxml. jackson. core </ groupId >
< artifactId > jackson - databind </ artifactId >
< version > 2.12 .0 </ version >
</ dependency >
<!-- https: //mvnrepository.com/artifact/mysql/mysql-connector-java -->
< dependency >
< groupId > mysql </ groupId >
< artifactId > mysql - connector - java </ artifactId >
< version > 8.0 .16 </ version >
</ dependency >
< dependency >
< groupId > com. alibaba </ groupId >
< artifactId > fastjson </ artifactId >
< version > 1.2 .66 </ version >
</ dependency >
</ dependencies >
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
(2)编写代码
public static void main( String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings. newInstance()
. inStreamingMode()
//.useOldPlanner() // flink
. useBlinkPlanner() // blink
. build();
StreamTableEnvironment ste = StreamTableEnvironment. create( env, settings);
String ddl = "CREATE TABLE flinksinksds(\r\n" +
"componentname STRING,\r\n" +
"componentcount INT,\r\n" +
"componentsum INT\r\n" +
") WITH(\r\n" +
"'connector.type'='jdbc',\r\n" +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n" +
"'connector.table'='flinksink',\r\n" +
"'connector.username'='root',\r\n" +
"'connector.password'='root',\r\n" +
"'connector.write.flush.max-rows'='1'\r\n" +
")";
System. err. println( ddl);
ste. executeSql( ddl);
String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +
"values('1024', 1 , 2 )";
ste. executeSql( insert);
env. execute();
System. exit( 0);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
(3)执行结果:
边栏推荐
猜你喜欢
shell------常用小工具,sort,uniq,tr,cut
2022-08-07 The fifth group Gu Xiangquan study notes day31-collection-Map collection
基于SCL语言的模拟量平均值滤波FB库功能介绍及创建FB库的具体方法
非科班毕业生,五面阿里:四轮技术面+HR一面已拿offer
JS-BOM-Name Converter - Input Name Position Reversed
基于微信小程序的幼儿园招生报名系统开发笔记
第一章、RPC 基础知识
JS加法器(DOM)
JS-Bom-while(计算闰年)
Mx_yolov3环境配置+模型测试训练
随机推荐
基于ModelArts的StyleGAN3生成高清图丨【华为云至简致远】
Brief description of the state of the thread
synchronized修饰类的注意事项
CS231n:6 训练神经网络(二)
CS231n: 6 training neural network (2)
What is low-code development?Is everyone really optimistic about low-code development?
【Kaggle】Save My Paper 基于自编码器的文本图像去噪
【小码匠自习室】AGC023-A :为啥总是N连发?为啥总遇到大神?
非科班毕业生,五面阿里:四轮技术面+HR一面已拿offer
面试题 17.05. 字母与数字
【小码匠自习室】[NOI Online 2020-3 入门组] 最急救助:被“幸运女神”眷顾的人
Create a 2D array
华为云云数据库RDS MySQL 版初试探【华为云至简致远】
浏览器跨域方案,适用于本地调试接口(超简单)
MySQL:Update高并发下变慢的案例及其涉及的特性
Talking about the underlying data structure of Redis
761. 特殊的二进制序列 : 经典构造题
shell正则表达式,三剑客grep命令
兆骑科创赛事服务平台对接,海内外高层次人才引进
什么是发饰hair accessories?