当前位置:网站首页>Flink customizes the application of sink side sinkfunction
Flink customizes the application of sink side sinkfunction
2022-04-23 03:19:00 【Bonyin】
The scene that
from kafka It reads data , Processed write to mysql Inside . stay flink Inside is from source To sink The process of , So this article explains ,mysqlsink Single data insertion and batch data insertion .
1、 Let's start with SinkFunction Inheritance relationship of

adopt API We can see sinkFunction There are many implementation classes for interfaces . For example, common RichSinkFunction.
2、mysql Single data insertion
public class SinkToMysql extends RichSinkFunction<Tuple2<String, Integer>> {
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
conn = getConnection();
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
ps.setString(1, value.f0);
ps.setString(2, value.f0 + "1");
// ps.addBatch();
int resultBatch = ps.executeUpdate();
System.out.println(" Successfully inserted data " + resultBatch + " That's ok ");
this.close();
}
}
The above class implements RichSikFunction class , rewrite invoke Method , This method is used to perform interaction with the database , insert data . This method is executed once there is a piece of data upstream . The actual development efficiency is low .
3、MySQL Batch insert
When the program arrives sink When , We can accumulate the upstream data for a period of time , Send together in batches to sink Downstream of .
public class SinkToMysqlBatch implements SinkFunction<ArrayList<String>> {
PreparedStatement ps;
Connection conn;
@Override
public void invoke(ArrayList<String> value, Context context) throws Exception {
conn = SinkToMysql.getConnection();
conn.setAutoCommit(false);
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
for (String s: value) {
ps.setString(1, s);
ps.setString(2,s+1);
ps.addBatch();
}
int[] resultBatch = ps.executeBatch();
System.out.println(" Insert data into the current window :" + resultBatch.length+ " That's ok ");
conn.commit();
try {
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
The above batch operation , The custom class implements SinkFunction Interface , Rewrote invoke Method . Generics are collection types , Indicates that the data passed from the upstream is a collection . We need to be in invoke Method to traverse . One batch Write to mysql.
The problem encountered in actual development is

In this code ,addSink( Customized in mysql Batch classes inherit RichSinkFunction class , But there will be an error here, saying that it must be realized SinkFunction Interface ). There are questions here .
版权声明
本文为[Bonyin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220626055127.html
边栏推荐
- js递归树结构计算每个节点的叶子节点的数量并且输出
- General testing technology [1] classification of testing
- 《C语言程序设计》(谭浩强第五版) 第7章 用函数实现模块化程序设计 习题解析与答案
- 软件测试相关知识~
- New ORM framework -- Introduction to beetlsql
- Using swagger in. Net5
- [vs Code] solve the problem that the jupyter file displays exceptions in vs code
- C语言实现通讯录----(静态版本)
- C WPF UI framework mahapps switching theme
- socket編程 send()與 recv()函數詳解
猜你喜欢

12.<tag-链表和常考点综合>-lt.234-回文链表

全新的ORM框架——BeetlSQL介绍

The most easy to understand dependency injection and control inversion

The most easy to understand service container and scope of dependency injection
![[Mysql] LEFT函數 | RIGHT函數](/img/26/82e0f2280de011636c26931a74e749.png)
[Mysql] LEFT函數 | RIGHT函數

Mise en service PID du moteur de codage (anneau de vitesse | anneau de position | suivant)

一套组合拳,打造一款 IDEA 护眼方案

Xutils3 corrected a bug I reported. Happy

LoadRunner - performance testing tool

The most understandable life cycle of dependency injection
随机推荐
Yes Redis using distributed cache in NE6 webapi
Aspnetcore configuration multi environment log4net configuration file
The most easy to understand service container and scope of dependency injection
ASP. Net 6 middleware series - conditional Middleware
2022 Shandong Province safety officer C certificate work certificate question bank and online simulation examination
Do you really understand hashcode and equals???
yes. Net future
Impact of AOT and single file release on program performance
Super easy to use asynchronous export function of Excel
xutils3修改了我提报的一个bug,开心
MySQL keyword group_ Concat, combined connection query
Find the number of leaf nodes of binary tree
. net webapi access authorization mechanism and process design (header token + redis)
JSON related
2022a special equipment related management (elevator) work license question bank and simulation examination
Iotos IOT middle platform is connected to the access control system of isecure center
Utgard connection opcserver reported an error caused by: org jinterop. dcom. common. JIRuntimeException: Access is denied. [0x800
There is no index in the database table. When inserting data, SQL statements are used to prevent repeated addition (Reprint)
Fight leetcode again (290. Word law)
关于idea调试模式下启动特别慢的优化