当前位置:网站首页>Flink customizes the application of sink side sinkfunction
Flink customizes the application of sink side sinkfunction
2022-04-23 03:19:00 【Bonyin】
The scene that
from kafka It reads data , Processed write to mysql Inside . stay flink Inside is from source To sink The process of , So this article explains ,mysqlsink Single data insertion and batch data insertion .
1、 Let's start with SinkFunction Inheritance relationship of
adopt API We can see sinkFunction There are many implementation classes for interfaces . For example, common RichSinkFunction.
2、mysql Single data insertion
public class SinkToMysql extends RichSinkFunction<Tuple2<String, Integer>> {
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
conn = getConnection();
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
ps.setString(1, value.f0);
ps.setString(2, value.f0 + "1");
// ps.addBatch();
int resultBatch = ps.executeUpdate();
System.out.println(" Successfully inserted data " + resultBatch + " That's ok ");
this.close();
}
}
The above class implements RichSikFunction class , rewrite invoke Method , This method is used to perform interaction with the database , insert data . This method is executed once there is a piece of data upstream . The actual development efficiency is low .
3、MySQL Batch insert
When the program arrives sink When , We can accumulate the upstream data for a period of time , Send together in batches to sink Downstream of .
public class SinkToMysqlBatch implements SinkFunction<ArrayList<String>> {
PreparedStatement ps;
Connection conn;
@Override
public void invoke(ArrayList<String> value, Context context) throws Exception {
conn = SinkToMysql.getConnection();
conn.setAutoCommit(false);
String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
ps = this.conn.prepareStatement(sql);
for (String s: value) {
ps.setString(1, s);
ps.setString(2,s+1);
ps.addBatch();
}
int[] resultBatch = ps.executeBatch();
System.out.println(" Insert data into the current window :" + resultBatch.length+ " That's ok ");
conn.commit();
try {
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
The above batch operation , The custom class implements SinkFunction Interface , Rewrote invoke Method . Generics are collection types , Indicates that the data passed from the upstream is a collection . We need to be in invoke Method to traverse . One batch Write to mysql.
The problem encountered in actual development is
In this code ,addSink( Customized in mysql Batch classes inherit RichSinkFunction class , But there will be an error here, saying that it must be realized SinkFunction Interface ). There are questions here .
版权声明
本文为[Bonyin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220626055127.html
边栏推荐
- Use of ADB command [1]
- 批量下载文件----压缩后再下载
- Mysql database, inconsistent index character set, slow SQL query, interface timeout
- The most easy to understand service container and scope of dependency injection
- OLED multi-level menu record
- Flink实时数仓项目—DWS层设计与实现
- 【VS Code】解决jupyter文件在vs code中显示异常的问题
- Super easy to use asynchronous export function of Excel
- The most understandable life cycle of dependency injection
- Xamarin effect Chapter 22 recording effect
猜你喜欢
Experiment 6 input / output stream
Blazor University (12) - component lifecycle
How does Microsoft solve the problem of multiple programs on PC side -- internal implementation
[MySQL] left Function | Right Function
The most understandable life cycle of dependency injection
Huawei mobile ADB devices connection device is empty
ASP. Net 6 middleware series - Custom middleware classes
超好用的Excel异步导出功能
MySQL port is occupied when building xampp
OLED multi-level menu record
随机推荐
. net webapi access authorization mechanism and process design (header token + redis)
Mysql database, inconsistent index character set, slow SQL query, interface timeout
类似Jira的十大项目管理软件
POI create and export Excel based on data
js 中,为一个里面带有input 的label 绑定事件后在父元素绑定单机事件,事件执行两次,求解
Do you really understand hashcode and equals???
Use of metagroup object tuple in C
MySQL grouping query rules
[untitled]
Flink实时数仓项目—DWS层设计与实现
ThreadLocal test multithreaded variable instance
2022g2 boiler stoker examination question bank and online simulation examination
There is no index in the database table. When inserting data, SQL statements are used to prevent repeated addition (Reprint)
Idempotency practice operation, explaining idempotency based on business
Explanation keyword of MySQL
ASP. Net 6 middleware series - conditional Middleware
MySQL之explain关键字详解
幂等性实践操作,基于业务讲解幂等性
Can you answer the questions that cannot be answered with a monthly salary of 10k-20k?
JS inheritance