当前位置:网站首页>(6) FlinkSQL writes kafka data to mysql Method 1
(6) FlinkSQL writes kafka data to mysql Method 1
2022-08-08 13:02:00 【NBI big data】
这里不展开zookeeper、kafka安装配置
(1)首先需要启动zookeeper和kafka
(2)定义一个kafka生产者
package com.producers;
import com.alibaba.fastjson.JSONObject;
import com.pojo.Event;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
/**
* Created by lj on 2022-07-09.
*/
public class Kafaka_Producer {
public final static String bootstrapServers = "127.0.0.1:9092";
public static void main(String[] args) {
Properties props = new Properties();
//设置Kafka服务器地址
props.put("bootstrap.servers", bootstrapServers);
//设置数据key的序列化处理类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置数据value的序列化处理类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
int i = 0;
Random r=new Random(); //不传入种子
String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
while(true) {
Thread.sleep(2000);
WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)],i,i);
i++;
String msg = JSONObject.toJSONString(waterSensor);
System.out.println(msg);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null, msg)).get();
// System.out.println("recordMetadata: {"+ recordMetadata +"}");
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
(3)定义一个消息对象
package com.pojo;
import java.io.Serializable;
/**
* Created by lj on 2022-07-05.
*/
public class WaterSensor implements Serializable {
private String id;
private long ts;
private int vc;
public WaterSensor(){
}
public WaterSensor(String id,long ts,int vc){
this.id = id;
this.ts = ts;
this.vc = vc;
}
public int getVc() {
return vc;
}
public void setVc(int vc) {
this.vc = vc;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
}
(4)从kafka接入数据,并写入到mysql
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//读取kafka的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> streamSource = env.addSource(
new FlinkKafkaConsumer<String>(
"kafka_waterSensor",
new SimpleStringSchema(),
properties)
);
SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String s) throws Exception {
JSONObject json = (JSONObject)JSONObject.parse(s);
return new WaterSensor(json.getString("id"),json.getLong("ts"),json.getInteger("vc"));
}
});
// Convert stream to table
Table table = tableEnv.fromDataStream(waterDS,
$("id"),
$("ts"),
$("vc"),
$("pt").proctime());
tableEnv.createTemporaryView("EventTable", table);
tableEnv.executeSql("CREATE TABLE flinksink (" +
"componentname STRING," +
"componentcount BIGINT NOT NULL," +
"componentsum BIGINT" +
") WITH (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'connector.table' = 'flinksink'," +
"'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
"'connector.username' = 'root'," +
"'connector.password' = 'root'," +
"'connector.write.flush.max-rows'='3'\r\n" +
")"
);
Table mysql_user = tableEnv.from("flinksink");
mysql_user.printSchema();
Table result = tableEnv.sqlQuery(
"SELECT " +
"id as componentname, " + //window_start, window_end,
"COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable , " +
"DESCRIPTOR(pt), " +
"INTERVAL '10' SECOND)) " +
"GROUP BY id , window_start, window_end"
);
//方式一:写入数据库
// result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
//方式二:写入数据库
tableEnv.createTemporaryView("ResultTable", result);
tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //追加模式
env.execute();
}
(5)效果演示
边栏推荐
- C语言小项目 -- 通讯录(静态版+动态版+文件版)
- 【AI系统前沿动态第45期】Hinton:深度学习的下一个大事件;一块GPU训练TB级推荐模型不是梦;AI-GPU显存优化发展史
- (4)FlinkSQL将socket数据写入到mysql方式一
- Background, History and Lessons of Transfer Learning
- (6)FlinkSQL将kafka数据写入到mysql方式一
- Geoffrey Hinton:深度学习的下一个大事件
- (原创)[C#] GDI+ 之鼠标交互:原理、示例、一步步深入、性能优化
- 如何上线TB级推荐模型
- 看到这个应用上下线方式,不禁感叹:优雅,太优雅了!
- C# 反射 操作列表类型属性
猜你喜欢
changes not staged for commit solution
报错 | Cannot find module ‘@better-scroll/core/dist/types/BScroll‘
五心红娘6月成功案列
Kunpeng Developer Creation Day 2022: Kunpeng Full-Stack Innovation and Developers Build Digital Hunan
Docker-持久化数据库(数据卷)
Replication监控及自动故障切换
Supervisor 后台进程管理
动图图解!既然IP层会分片,为什么TCP层也还要分段?
【C语言】文件相关操作
产品-Axure9英文版,下拉框Droplist的条件选择,显示不同内容面板
随机推荐
结合“xPlus”探讨软件架构的创新与变革
2020年是时候更新你的技术武器库了:Asgi vs Wsgi(FastAPI vs Flask)
SSL证书最长有效期13个月,还有必要一次申请多年吗?
ReentrantLock原理,ReentrantLock和synchronized区别
尝试开发微信公众号消息推送功能并且和小程序关联
为你的网站加上live2d的动态小挂件,博君一晒
ets declarative ui development, how to get the current system time
JSON的Unicode问题;自定义排序问题;保留最大子集问题
SQL的INSERT INTO和INSERT INTO SELECT语句
安装MinGW-w64
The most complete JVM performance tuning in history: thread + subsystem + class loading + memory allocation + garbage collection
面试官问你什么是长轮询?
如何上线TB级推荐模型
你的 golang 程序正在悄悄内存泄漏
STM32的内存管理相关(内存架构,内存管理,map文件分析)
“华数杯”建模学习(Matlab)
shell基础知识合集
vim /etc/profile 写入时 出现 E121:无法打开并写入文件解决方案
老手也常误用!详解 Go channel 内存泄漏问题
看到这个应用上下线方式,不禁感叹:优雅,太优雅了!