当前位置:网站首页>Flink case - Kafka, MySQL source
Flink case - Kafka, MySQL source
2022-04-23 04:41:00 【Z-hhhhh】
Flink Case study ——kafka、MySQL source
One 、kafka source
flink and kafka The connection is very friendly , After all, it's streaming processing .
First, rely on
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Then comes the code
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._
object KafkaSource {
def main(args: Array[String]): Unit = {
// Environmental Science
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// Parallelism
env.setParallelism(4)
env.disableOperatorChaining()
//kafka To configure Clusters are separated by commas , Such as 172.0.0.101:1111,172.0.0.102:1111
val pro: Properties = new Properties()
pro.setProperty("bootstrap.servers", "*******");
pro.setProperty("group.id", "topic");
pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("auto.offset.reset","latest")
// receive kafka data
env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro))
.print()
// perform
env.execute()
}
}
Two 、MySQL source
MySQL Adopt the method of user-defined data source
rely on
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
Code
import java.sql.{
Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{
RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object MysqlSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// receive MySQL data
val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1)
inputData.print()
env.execute("mysql source")
}
// According to the table Create a sample class
case class class_name(id: Int, cid: Int)
class MySQLSource extends RichParallelSourceFunction[class_name] {
var flag = true
var conn: Connection = _
var stat: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123")
val sql = "select id,cid from class_name"
stat = conn.prepareStatement(sql)
}
override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = {
while (flag) {
val resultSet: ResultSet = stat.executeQuery()
while (resultSet.next()) {
val id = resultSet.getInt("id")
val cid = resultSet.getInt("cid")
sourceContext.collect(class_name(id, cid))
Thread.sleep(100)
}
}
}
override def cancel(): Unit = {
flag = false
}
override def close(): Unit = {
if (stat != null) stat.close()
if (conn != null) conn.close()
}
}
}
I haven't blogged for months , In the future, we should stick to writing .
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220559122323.html
边栏推荐
- A lifetime of needs, team collaboration can play this way on cloud nailing applet
- The 14th issue of HMS core discovery reviews the long article | enjoy the silky clip and release the creativity of the video
- shell wc (统计字符数量)的基本使用
- Recursive call -- Enumeration of permutations
- Leetcode005 -- delete duplicate elements in the array in place
- leetcode005--原地删除数组中的重复元素
- 1个需求的一生,团队协作在云效钉钉小程序上可以这么玩
- 用D435i录制自己的数据集运行ORBslam2并构建稠密点云
- Inverse system of RC low pass filter
- How to regulate intestinal flora? Introduction to common natural substances, probiotics and prebiotics
猜你喜欢

What is a data island? Why is there still a data island in 2022?

PIP3 installation requests Library - the most complete pit sorting

QML advanced (IV) - drawing custom controls

Apache Bench(ab 压力测试工具)的安装与使用

无线充电全国产化电子元件推荐方案
![[paper reading] [3D object detection] voxel transformer for 3D object detection](/img/a2/9f66789cc12fad99491309717cf418.png)
[paper reading] [3D object detection] voxel transformer for 3D object detection

Interaction of diet gut microbiota on cardiovascular disease

无线键盘全国产化电子元件推荐方案

zynq平台交叉编译器的安装

2021数学建模国赛一等奖经验总结与分享
随机推荐
Summary of Android development posts I interviewed in those years (attached test questions + answer analysis)
Shanghai Hangxin technology sharing 𞓜 overview of safety characteristics of acm32 MCU
用D435i录制自己的数据集运行ORBslam2并构建稠密点云
leetcode001--返回和为target的数组元素的下标
leetcode005--原地删除数组中的重复元素
程序员抱怨:1万2的工资我真的活不下去了,网友:我3千咋说
【论文阅读】【3d目标检测】Voxel Transformer for 3D Object Detection
Recommended scheme for national production of electronic components for wireless charging
Go反射—Go语言圣经学习笔记
[pytoch foundation] torch Split() usage
MYSQL去重方法汇总
IEEE Transactions on systems, man, and Cybernetics: Notes for systems (TSMC)
Luogu p1858 [multi person knapsack] (knapsack seeking the top k optimal solution)
補:注解(Annotation)
QML进阶(四)-绘制自定义控件
Flink's important basics
Supplément: annotation
基于英飞凌MCU GTM模块的无刷电机驱动方案开源啦
优麒麟 22.04 LTS 版本正式发布 | UKUI 3.1开启全新体验
兼容NSR20F30NXT5G的小体积肖特基二极管