当前位置:网站首页>Flink customizes the application of sink side sinkfunction
Flink customizes the application of sink side sinkfunction
2022-04-23 03:19:00 【Bonyin】
The scene that
from kafka It reads data , Processed write to mysql Inside . stay flink Inside is from source To sink The process of , So this article explains ,mysqlsink Single data insertion and batch data insertion .
1、 Let's start with SinkFunction Inheritance relationship of

adopt API We can see sinkFunction There are many implementation classes for interfaces . For example, common RichSinkFunction.
2、mysql Single data insertion
public class SinkToMysql extends RichSinkFunction<Tuple2<String, Integer>> {
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
conn = getConnection();
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
ps.setString(1, value.f0);
ps.setString(2, value.f0 + "1");
// ps.addBatch();
int resultBatch = ps.executeUpdate();
System.out.println(" Successfully inserted data " + resultBatch + " That's ok ");
this.close();
}
}
The above class implements RichSikFunction class , rewrite invoke Method , This method is used to perform interaction with the database , insert data . This method is executed once there is a piece of data upstream . The actual development efficiency is low .
3、MySQL Batch insert
When the program arrives sink When , We can accumulate the upstream data for a period of time , Send together in batches to sink Downstream of .
public class SinkToMysqlBatch implements SinkFunction<ArrayList<String>> {
PreparedStatement ps;
Connection conn;
@Override
public void invoke(ArrayList<String> value, Context context) throws Exception {
conn = SinkToMysql.getConnection();
conn.setAutoCommit(false);
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
for (String s: value) {
ps.setString(1, s);
ps.setString(2,s+1);
ps.addBatch();
}
int[] resultBatch = ps.executeBatch();
System.out.println(" Insert data into the current window :" + resultBatch.length+ " That's ok ");
conn.commit();
try {
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
The above batch operation , The custom class implements SinkFunction Interface , Rewrote invoke Method . Generics are collection types , Indicates that the data passed from the upstream is a collection . We need to be in invoke Method to traverse . One batch Write to mysql.
The problem encountered in actual development is

In this code ,addSink( Customized in mysql Batch classes inherit RichSinkFunction class , But there will be an error here, saying that it must be realized SinkFunction Interface ). There are questions here .
版权声明
本文为[Bonyin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220626055127.html
边栏推荐
- Using swagger in. Net5
- socket編程 send()與 recv()函數詳解
- Experiment 5 components and event handling
- MySql分组查询规则
- General test technology [II] test method
- Detailed explanation of socket programming send() and recv() functions
- 2022a special equipment related management (elevator) work license question bank and simulation examination
- JS implementation of new
- Knowledge of software testing~
- Iotos IOT middle platform is connected to the access control system of isecure center
猜你喜欢
随机推荐
超好用的【通用Excel导入功能】
ThreadLocal test multithreaded variable instance
yes. Net future
数据挖掘系列(3)_Excel的数据挖掘插件_估计分析
可以接收多種數據類型參數——可變參數
移植tslib时ts_setup: No such file or directory、ts_open: No such file or director
Preview of converting doc and PDF to SWF file
Use of slice grammar sugar in C #
可以接收多种数据类型参数——可变参数
2022年度Top9的任务管理系统
How does Microsoft solve the problem of multiple programs on PC side -- internal implementation
12.<tag-链表和常考点综合>-lt.234-回文链表
EasyUI's combobox implements three-level query
一文了解全面静态代码分析
JS, bind the event for a label with input, and then bind the stand-alone event in the parent element. The event is executed twice and solved
PID debugging of coding motor (speed loop | position loop | follow)
Web Course Design - his system
Drawing polygons with < polygon / > circular array in SVG tag
软件测试相关知识~
幂等性实践操作,基于业务讲解幂等性









