当前位置:网站首页>Flink学习15:Flink自定义数据源
Flink学习15:Flink自定义数据源
2022-08-10 03:12:00 【hzp666】
自定义数据源,核心3步:
1.创建一个类,用来指定数据流中的数据类型
2.创建一个数据源的类,继承RichSourceFunction等类,并重写run 和cancel 方法
3.在main方法中,生成环境后,把自定义的数据源的类,通过addSource 加入到环境中
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import java.util.Calendar import scala.util.Random //defined the stockPrice attribute case class StockPrice(stockID:String, timestamp:Long, price:Double) //to define myself dataSource class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean =true val rand = new Random() //initialize the stock price private var priceList = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d) var stockId =0 var curPrice =0.0d //when we defined our dataSource function, we must override Two function :run and cancel override def run(sourceContext: SourceFunction.SourceContext[StockPrice]): Unit = { while (isRunning){ //change the stock price random //get the stockid by random stockId=rand.nextInt(priceList.size) //generate the random price val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05 //update the stock price list priceList = priceList.updated(stockId,curPrice) //create the time stamp val curTime = Calendar.getInstance.getTimeInMillis //add my data source to sourceContext sourceContext.collect(StockPrice("stock_"+stockId.toString, curTime, curPrice)) //thread sleep Thread.sleep(rand.nextInt(10)) } } override def cancel(): Unit = { //cancel the run function isRunning=false } } object myDataSourceTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //set the parallelism env.setParallelism(1) //create my dataSource val stockPriceStream: DataStream[StockPrice] = env.addSource(new StockPriceSource) //print stockPriceStream.print() //execute env.execute("stock price streaming") } }
边栏推荐
猜你喜欢
随机推荐
Example 045: Summation
【红队】ATT&CK - 自启动 - 注册表运行键、启动文件夹
Recommend several easy-to-use MySQL open source clients, it is recommended to collect
书法家唐效奇
【每日一题】大佬们进来看看吧
使用flink-sql写入mysql的时候,只指定插入的字段,但是会报错id字段错误,没有默认值,创
电话自动拨号在电脑上自动拨打
flutter 每天一背,需要掌握
Pen paper records
网页挖矿溯源?浏览器浏览历史查看工具Browsinghistoryview
【Image Classification】2022-ConvMixer ICLR
嵌入式分享合集32
flutter异步
Flink CDC 2.0及其他数据同步工具对比
Example 043: Scope, class methods and variables
Embedded Sharing Collection 32
Kettle 裁剪表详解(truncate)
The so-called software testing ability is actually these 5 points
Excel Advanced Drawing Skills 100 Lectures (23) - Countdown Counting in Excel
NFG电商系统在元宇宙趋势下做什么?