当前位置:网站首页>Flink学习15:Flink自定义数据源
Flink学习15:Flink自定义数据源
2022-08-10 03:12:00 【hzp666】


自定义数据源,核心3步:
1.创建一个类,用来指定数据流中的数据类型
2.创建一个数据源的类,继承RichSourceFunction等类,并重写run 和cancel 方法
3.在main方法中,生成环境后,把自定义的数据源的类,通过addSource 加入到环境中
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import java.util.Calendar
import scala.util.Random
//defined the stockPrice attribute
case class StockPrice(stockID:String, timestamp:Long, price:Double)
//to define myself dataSource
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean =true
val rand = new Random()
//initialize the stock price
private var priceList = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId =0
var curPrice =0.0d
//when we defined our dataSource function, we must override Two function :run and cancel
override def run(sourceContext: SourceFunction.SourceContext[StockPrice]): Unit = {
while (isRunning){
//change the stock price random
//get the stockid by random
stockId=rand.nextInt(priceList.size)
//generate the random price
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
//update the stock price list
priceList = priceList.updated(stockId,curPrice)
//create the time stamp
val curTime = Calendar.getInstance.getTimeInMillis
//add my data source to sourceContext
sourceContext.collect(StockPrice("stock_"+stockId.toString, curTime, curPrice))
//thread sleep
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
//cancel the run function
isRunning=false
}
}
object myDataSourceTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//set the parallelism
env.setParallelism(1)
//create my dataSource
val stockPriceStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)
//print
stockPriceStream.print()
//execute
env.execute("stock price streaming")
}
}
边栏推荐
猜你喜欢
随机推荐
【Image Classification】2022-CycleMLP ICLR
常用类以及接口
flutter 制作嵌套列表
plsql 查询数据库操作历史记录(Ctrl + e)
Kotlin协程:父子协程的绑定与传递
flex 的 三个参数:flex-grow、flex-shrink、flex-basis
小程序导航及导航传参
liunx PS1 settings
Example 043: Scope, class methods and variables
金融财经翻译的行业前景如何
Mini Program Navigation and Navigation Parameters
Error state based Kalman filter ESKF
uniapp 路由与页面跳转
(十四)时间延时任务及定时任务
Chip Accelerator
怎么进行服务器性能监控,有什么监控工具
Example 046: Breaking the Cycle
【每日一题】大佬们进来看看吧
Take you to an in-depth understanding of the version update of 3.4.2, what does it bring to users?
Web mining traceability?Browser browsing history viewing tool Browsinghistoryview









