当前位置:网站首页>Flink case - Kafka, MySQL source
Flink case - Kafka, MySQL source
2022-04-23 04:41:00 【Z-hhhhh】
Flink Case study ——kafka、MySQL source
One 、kafka source
flink and kafka The connection is very friendly , After all, it's streaming processing .
First, rely on
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Then comes the code
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._
object KafkaSource {
def main(args: Array[String]): Unit = {
// Environmental Science
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// Parallelism
env.setParallelism(4)
env.disableOperatorChaining()
//kafka To configure Clusters are separated by commas , Such as 172.0.0.101:1111,172.0.0.102:1111
val pro: Properties = new Properties()
pro.setProperty("bootstrap.servers", "*******");
pro.setProperty("group.id", "topic");
pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("auto.offset.reset","latest")
// receive kafka data
env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro))
.print()
// perform
env.execute()
}
}
Two 、MySQL source
MySQL Adopt the method of user-defined data source
rely on
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
Code
import java.sql.{
Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{
RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object MysqlSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// receive MySQL data
val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1)
inputData.print()
env.execute("mysql source")
}
// According to the table Create a sample class
case class class_name(id: Int, cid: Int)
class MySQLSource extends RichParallelSourceFunction[class_name] {
var flag = true
var conn: Connection = _
var stat: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123")
val sql = "select id,cid from class_name"
stat = conn.prepareStatement(sql)
}
override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = {
while (flag) {
val resultSet: ResultSet = stat.executeQuery()
while (resultSet.next()) {
val id = resultSet.getInt("id")
val cid = resultSet.getInt("cid")
sourceContext.collect(class_name(id, cid))
Thread.sleep(100)
}
}
}
override def cancel(): Unit = {
flag = false
}
override def close(): Unit = {
if (stat != null) stat.close()
if (conn != null) conn.close()
}
}
}
I haven't blogged for months , In the future, we should stick to writing .
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220559122323.html
边栏推荐
- The perfect combination of collaborative process and multi process
- MYSQL查询至少连续n天登录的用户
- IEEE Transactions on systems, man, and Cybernetics: Notes for systems (TSMC)
- 从MySQL数据库迁移到AWS DynamoDB
- Brushless motor drive scheme based on Infineon MCU GTM module
- win10, mysql-8.0.26-winx64. Zip installation
- [pytoch foundation] torch Split() usage
- Shanghai Hangxin technology sharing 𞓜 overview of safety characteristics of acm32 MCU
- Code007 -- determine whether the string in parentheses matches
- Logger and zap log Library in go language
猜你喜欢
随机推荐
用D435i录制自己的数据集运行ORBslam2并构建稠密点云
Kotlin. The binary version of its metadata is 1.6.0, expected version is 1.1.15.
KVM error: Failed to connect socket to ‘/var/run/libvirt/libvirt-sock‘
Detailed explanation of life cycle component of jetpack
leetcode002--将有符号整数的数字部分反转
Summary of Android development posts I interviewed in those years (attached test questions + answer analysis)
test
Eksctl deploying AWS eks
Basic use of shell WC (counting the number of characters)
Coinbase:关于跨链桥的基础知识、事实和统计数据
520. Detect capital letters
第四章 --- 了解标准设备文件、过滤器和管道
IDE idea automatic compilation and configuration of on update action and on frame deactivation
Installation and use of Apache bench (AB pressure test tool)
io. Platform. packageRoot; // ignore: deprecated_ Member_ use
Leetcode - > 1 sum of two numbers
Go反射法则
协程与多进程的完美结合
Key points of AWS eks deployment and differences between console and eksctl creation
Use recyclerview to realize left-right side-by-side classification selection