当前位置:网站首页>Flink customizes the application of sink side sinkfunction
Flink customizes the application of sink side sinkfunction
2022-04-23 03:19:00 【Bonyin】
The scene that
from kafka It reads data , Processed write to mysql Inside . stay flink Inside is from source To sink The process of , So this article explains ,mysqlsink Single data insertion and batch data insertion .
1、 Let's start with SinkFunction Inheritance relationship of

adopt API We can see sinkFunction There are many implementation classes for interfaces . For example, common RichSinkFunction.
2、mysql Single data insertion
public class SinkToMysql extends RichSinkFunction<Tuple2<String, Integer>> {
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
conn = getConnection();
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
ps.setString(1, value.f0);
ps.setString(2, value.f0 + "1");
// ps.addBatch();
int resultBatch = ps.executeUpdate();
System.out.println(" Successfully inserted data " + resultBatch + " That's ok ");
this.close();
}
}
The above class implements RichSikFunction class , rewrite invoke Method , This method is used to perform interaction with the database , insert data . This method is executed once there is a piece of data upstream . The actual development efficiency is low .
3、MySQL Batch insert
When the program arrives sink When , We can accumulate the upstream data for a period of time , Send together in batches to sink Downstream of .
public class SinkToMysqlBatch implements SinkFunction<ArrayList<String>> {
PreparedStatement ps;
Connection conn;
@Override
public void invoke(ArrayList<String> value, Context context) throws Exception {
conn = SinkToMysql.getConnection();
conn.setAutoCommit(false);
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
for (String s: value) {
ps.setString(1, s);
ps.setString(2,s+1);
ps.addBatch();
}
int[] resultBatch = ps.executeBatch();
System.out.println(" Insert data into the current window :" + resultBatch.length+ " That's ok ");
conn.commit();
try {
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
The above batch operation , The custom class implements SinkFunction Interface , Rewrote invoke Method . Generics are collection types , Indicates that the data passed from the upstream is a collection . We need to be in invoke Method to traverse . One batch Write to mysql.
The problem encountered in actual development is

In this code ,addSink( Customized in mysql Batch classes inherit RichSinkFunction class , But there will be an error here, saying that it must be realized SinkFunction Interface ). There are questions here .
版权声明
本文为[Bonyin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220626055127.html
边栏推荐
- 批量下载文件----压缩后再下载
- Find the number of leaf nodes of binary tree
- MySql分组查询规则
- Fight leetcode again (290. Word law)
- 二进制文件版本控制工具选择难?看完这篇你会找到答案
- Drawing polygons with < polygon / > circular array in SVG tag
- Generate QR code through zxing
- 2022山东省安全员C证上岗证题库及在线模拟考试
- 月薪10k-20k都无法回答的事务问题,你会吗?
- MySQL keyword group_ Concat, combined connection query
猜你喜欢
随机推荐
JS, bind the event for a label with input, and then bind the stand-alone event in the parent element. The event is executed twice and solved
队列的存储和循环队列
[Mysql] LEFT函數 | RIGHT函數
Build websocket server in. Net5 webapi
二进制文件版本控制工具选择难?看完这篇你会找到答案
Aspnetcore configuration multi environment log4net configuration file
Course design of Database Principle -- material distribution management system
可以接收多種數據類型參數——可變參數
2022A特种设备相关管理(电梯)上岗证题库及模拟考试
Comprehensive calculation of employee information
socket編程 send()與 recv()函數詳解
场景题:A系统如何使用B系统的页面
How to achieve centralized management, flexible and efficient CI / CD online seminar highlights sharing
oracle 查询外键含有逗号分隔的数据
2022t elevator repair test simulation 100 questions and online simulation test
Chapter 9 of C language programming (fifth edition of Tan Haoqiang) analysis and answer of exercises for users to establish their own data types
超好用的Excel异步导出功能
The most easy to understand dependency injection and control inversion
Flink real-time data warehouse project - Design and implementation of DWS layer
js递归树结构计算每个节点的叶子节点的数量并且输出



![[untitled]](/img/b5/6ce72422bbf330610c747ceb482944.jpg)





![General test technology [II] test method](/img/b7/f661f446616ad6bfbbf48eb03ea82d.png)