当前位置:网站首页>Flink学习15:Flink自定义数据源
Flink学习15:Flink自定义数据源
2022-08-10 03:12:00 【hzp666】
自定义数据源,核心3步:
1.创建一个类,用来指定数据流中的数据类型
2.创建一个数据源的类,继承RichSourceFunction等类,并重写run 和cancel 方法
3.在main方法中,生成环境后,把自定义的数据源的类,通过addSource 加入到环境中
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import java.util.Calendar import scala.util.Random //defined the stockPrice attribute case class StockPrice(stockID:String, timestamp:Long, price:Double) //to define myself dataSource class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean =true val rand = new Random() //initialize the stock price private var priceList = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d) var stockId =0 var curPrice =0.0d //when we defined our dataSource function, we must override Two function :run and cancel override def run(sourceContext: SourceFunction.SourceContext[StockPrice]): Unit = { while (isRunning){ //change the stock price random //get the stockid by random stockId=rand.nextInt(priceList.size) //generate the random price val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05 //update the stock price list priceList = priceList.updated(stockId,curPrice) //create the time stamp val curTime = Calendar.getInstance.getTimeInMillis //add my data source to sourceContext sourceContext.collect(StockPrice("stock_"+stockId.toString, curTime, curPrice)) //thread sleep Thread.sleep(rand.nextInt(10)) } } override def cancel(): Unit = { //cancel the run function isRunning=false } } object myDataSourceTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //set the parallelism env.setParallelism(1) //create my dataSource val stockPriceStream: DataStream[StockPrice] = env.addSource(new StockPriceSource) //print stockPriceStream.print() //execute env.execute("stock price streaming") } }
边栏推荐
- 如何让导电滑环信号更好
- flutter 创建可增型列表和列表排序
- MySQL: What MySQL optimizations have you done?
- 是什么让训练综合分类网络艰苦?
- No ‘Access-Control-Allow-Origin‘ header is present on the requested resource.
- 10个超赞的C语言开源项目,值得学习
- uniapp 路由与页面跳转
- @Autowired注解 --required a single bean, but 2 were found出现的原因以及解决方法
- Example 046: Breaking the Cycle
- 常用类以及接口
猜你喜欢
随机推荐
(面试加分新技能) 总结11个ES2022中你可能遗漏的语法
全面深入了解什么是反向代理和负载均衡
Recommend several easy-to-use MySQL open source clients, it is recommended to collect
Dynamic Web Development Fundamentals
电子产品结构设计中的电磁兼容性(EMC)设计
维度表设计
三极管开关电路参数设计与参数介绍
PC摄像头设置 默认摄像头设置 win11 默认摄像头设置
黑马jvm课程笔记d2
兴业数金一面
小程序导航及导航传参
Camera partial update
Arrays类
使用curl指令发起websocket请求
有关视频传输时粘包问题的一些解决方法
uniapp 路由与页面跳转
exchange2010 邮件数据库无法装入
Chip Accelerator
【CC3200AI 实验教程5】疯壳·AI语音人脸识别(会议记录仪/人脸打卡机)-定时器
Kotlin协程:父子协程的绑定与传递