当前位置:网站首页>Flink-cdc 同步mysql数据
Flink-cdc 同步mysql数据
2022-08-06 03:58:00 【四维大脑】
下载地址:https://github.com/ververica/flink-cdc-connectors/releases
这里下载2.2.0版本:https://github.com/ververica/flink-cdc-connectors/archive/refs/tags/release-2.2.0.zip
下载完成后,在 pom.xml 中找到这一项:flink.version ,修改 flink 版本号为:
<flink.version>1.13.6</flink.version>
自行打包编译
通过flink-cdc 同步mysql数据
1、flink集群准备
wget http://mirrors.cloud.tencent.com/apache/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
tar zxvf flink-1.13.6-bin-scala_2.11.tgz
将打包好的 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 包放入到flink的lib目录下
启动集群
cd flink-1.13.6
bin/start-cluster.sh
2、mysql环境准备
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
3、启动flinksql client
cd /opt/flink-1.13.6
bin/sql-client.sh
4、在flinksql client中执行命令
Flink SQL> SET execution.checkpointing.interval = 3s
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '自己的ip地址',
'port' = '3306',
'username' = 'root',
'password' = '密码',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> select * from products;
5、在 MySQL 客户端继续插入数据
INSERT INTO products VALUES (default,"scooter1","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter3","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter4","Small 2-wheel scooter");
4、在flinksql client中查看数据
select * from products;
可以查看到数据变化
边栏推荐
- 89.(cesium之家)cesium聚合图(自定义图片)
- leetcode:20. 有效的括号
- 洛谷 :P1104 生日
- Literature Reading---Analysis of Genome Haplotypes and Genome Stability and Creeping of Common Bermudagrass Yangjiang
- 力扣------分数排名
- Electric power class topic of the question
- Develop SQL editors with Monaco Editor
- What is an inner class?
- The third day of learning MySQL: functions (basic)
- 浦发银行长沙分行党委书记、行长王起,华融湘江银行行长蒋俊文一行莅临麒麟信安调研考察
猜你喜欢

Mysql安装 求大拿解答

造自己的芯,让谷歌买单!谷歌再度开源 180nm 工艺的芯片

What is an Egg. The js, it have what features, installation of an Egg framework, definition of routing, what's Controller is the Controller, CORS configuration, the json configuration, a get request,

数字孪生实际应用案例-智慧港口篇

单人最高5万奖金!顶会论文复现赛正式启动,70+公开任务等你挑战

力扣------分数排名

(九)集合 - List

喜欢我们不如加入我们:来投稿吧,稿酬靠谱!

Lvm root partition expansion

The delivery language set in the Google Ads background, if English is set, does it mean that the user is using English?
随机推荐
$nextTick 原理及作用
2022年京东新百货七夕礼遇季活动有什么亮点?
6. -- -- -- -- -- test automation of software testing unittest framework
[wpf] Detailed explanation of three callbacks for dependency properties
89. (home of cesium) cesium aggregation graph (custom image)
defineProperty 和 proxy 的区别
Base64与16进制HEX之间的转换
NPDP为什么越来越受追捧?产品经理你可知道?
【Untitled】
Mysql installation ask for answers
【无标题】目录测试
xctf attack and defense world web master advanced area command_execution
2022年阿里云服务器配置选取攻略
Django用orm修改mysql数据库运行出现错误
哈利波特:哈迷们集合啦
十四. go channel
Discrete mathematics the final problem
Induction of common annotations in Sring
【无标题】
2022年企业和站长租用服务器的几个常见误区