当前位置:网站首页>Flink案例——kafka、MySQL source
Flink案例——kafka、MySQL source
2022-04-22 06:01:00 【Z-hhhhh】
Flink案例——kafka、MySQL source
一、kafka source
flink和kafka的连接是十分友好的,毕竟是做流式处理的吧。
首先依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
接着是代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._
object KafkaSource {
def main(args: Array[String]): Unit = {
//环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//并行度
env.setParallelism(4)
env.disableOperatorChaining()
//kafka配置 集群以逗号隔开,如172.0.0.101:1111,172.0.0.102:1111
val pro: Properties = new Properties()
pro.setProperty("bootstrap.servers", "*******");
pro.setProperty("group.id", "topic");
pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
pro.setProperty("auto.offset.reset","latest")
//接收kafka数据
env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro))
.print()
//执行
env.execute()
}
}
二、MySQL source
MySQL采用自定义数据源的方式
依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
代码
import java.sql.{
Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{
RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object MysqlSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//接收MySQL数据
val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1)
inputData.print()
env.execute("mysql source")
}
//根据表 创建样例类
case class class_name(id: Int, cid: Int)
class MySQLSource extends RichParallelSourceFunction[class_name] {
var flag = true
var conn: Connection = _
var stat: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123")
val sql = "select id,cid from class_name"
stat = conn.prepareStatement(sql)
}
override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = {
while (flag) {
val resultSet: ResultSet = stat.executeQuery()
while (resultSet.next()) {
val id = resultSet.getInt("id")
val cid = resultSet.getInt("cid")
sourceContext.collect(class_name(id, cid))
Thread.sleep(100)
}
}
}
override def cancel(): Unit = {
flag = false
}
override def close(): Unit = {
if (stat != null) stat.close()
if (conn != null) conn.close()
}
}
}
几个月没写博客了,以后还是要坚持写才好。
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_45399602/article/details/122495063
边栏推荐
猜你喜欢

Pixel 5 5G解锁教程(含解锁BL,安装EdXposed与Root)

创新实训(五)配置信息

树莓派 3B入门——系统的安装

EXCEL 利用替换、分列、填充功能综合整理财务数据

Solve the problem of error in installing PostgreSQL under windows2012 R2

剑指offer:二叉树中和为某一值的路径(回溯)

Applet custom native bottom navigation

Open source database management systems are now more popular than commercial products

利用win自带功能让处于同一局域网的两个电脑之间互传文件(速度和本地磁盘间互传相同)

创新实训(四)前期准备—服务器
随机推荐
创新实训(十二)爬虫
EXCEL 保护工作表、工作薄不被破坏
Pure JS chain animation and simultaneous motion
8张图让你一步步看清 async/await 和 promise 的执行顺序
MySQL is a classic question often asked in an interview.
指纹支付相关的细节处理
使用@Autowired出现Field injection is not recommended
Spent four days painstakingly writing all the notes of MySQL, which is very suitable for beginners.
Redis取出数据乱码问题
equalsIgnoreCase()和equals()的区别
JS get screen, browser, web page height and width
创新实训(四)进度
nodejs+Express+mongodb
Canvas和SVG的区别
在微信小程序中打开的页面不能超过10个,达到10个页面后,就不能再打开新的页面
The difference between watch and computed
Pgdoucer best practices: Series 3
Uniapp applet anchor point (only support the current page operation, do not cross page operation)
POM文件浅析
Uniapp wechat applet user authorization to obtain current location information Tencent map