当前位置:网站首页>Flink学习15:Flink自定义数据源
Flink学习15:Flink自定义数据源
2022-08-10 03:12:00 【hzp666】


自定义数据源,核心3步:
1.创建一个类,用来指定数据流中的数据类型
2.创建一个数据源的类,继承RichSourceFunction等类,并重写run 和cancel 方法
3.在main方法中,生成环境后,把自定义的数据源的类,通过addSource 加入到环境中
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import java.util.Calendar
import scala.util.Random
//defined the stockPrice attribute
case class StockPrice(stockID:String, timestamp:Long, price:Double)
//to define myself dataSource
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean =true
val rand = new Random()
//initialize the stock price
private var priceList = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId =0
var curPrice =0.0d
//when we defined our dataSource function, we must override Two function :run and cancel
override def run(sourceContext: SourceFunction.SourceContext[StockPrice]): Unit = {
while (isRunning){
//change the stock price random
//get the stockid by random
stockId=rand.nextInt(priceList.size)
//generate the random price
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
//update the stock price list
priceList = priceList.updated(stockId,curPrice)
//create the time stamp
val curTime = Calendar.getInstance.getTimeInMillis
//add my data source to sourceContext
sourceContext.collect(StockPrice("stock_"+stockId.toString, curTime, curPrice))
//thread sleep
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
//cancel the run function
isRunning=false
}
}
object myDataSourceTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//set the parallelism
env.setParallelism(1)
//create my dataSource
val stockPriceStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)
//print
stockPriceStream.print()
//execute
env.execute("stock price streaming")
}
}
边栏推荐
猜你喜欢
随机推荐
charles的功能操作
维度表设计
zabbix添加监控主机和自定义监控项
@Autowired注解 --required a single bean, but 2 were found出现的原因以及解决方法
驱动程序开发:按键中断之异步通知
学习总结week4_2正则
元宇宙+NFT是“宝”还是“炒”
Classes and interfaces
网页挖矿溯源?浏览器浏览历史查看工具Browsinghistoryview
uva1392
excel高级绘图技巧100讲(二十三)-Excel中实现倒计时计数
flutter 制作嵌套列表
Example 047: Functions Swap Variables
书法家唐效奇
ARP Spoofing - Tutorial Details
笔试题记录
How to quickly become a software test engineer?What skills do testers need for a monthly salary of 15k?
同样是初级测试,凭什么他比我薪资高 5000 块?
ARP欺骗-教程详解
动态网页开发基础









