当前位置:网站首页>Flink Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic
Flink Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic
2022-08-08 08:13:00 【Allen-1】
1.flink流式处理
解决方案:修改滑动处理时间的时间间隔
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)
2.修改后的代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.{
TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
//flink的watermark字段处理
object Streaming_job {
def main(args: Array[String]): Unit = {
// 创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 获取本地数据
val text = env.socketTextStream("127.0.0.1", 7777)
// val text = "aaa"
import org.apache.flink.api.scala._
// 开发业务的逻辑
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.sum(1)
.print()
.setParallelism(1)
env.execute("Flink Streaming WordCount")
}
}
3.结果展示
参考博客:
https://blog.csdn.net/u010002184/article/details/115726664
http://t.zoukankan.com/-courage-p-14804510.html
边栏推荐
猜你喜欢
随机推荐
Data Governance (3): Data Quality Management
力扣142-环形链表——链表&快慢指针法&哈希表法
throw和throws区别
shell循环语句
炽热如初 向新而生|ISC2022 HackingClub白帽峰会圆满举办
机器学习理论及案例分析(part3)--聚类
什么是DFT?FT、FS、DTFT、DFS、DFT的关系
Nacos是如何实现心跳机制和服务续约以及超时剔除服务机制的?
小程序云开发服务端(云函数-函数式编程)数据库取出数据突破限制
看顶级测工怎么玩转Apifox接口测试工具
两个联动的可扩展收起的textView的简单实现
数据治理(三):数据质量管理
oracle sql语法 更改为mysql sql语法 或者代码实现
使用Jlink RTT工具打印日志
微软 .NET Core 3.1 年底将结束支持,请升级到.NET 6
VSCode代码格式化快捷键及保存时自动格式化
大文件上传时如何做到 秒传?
VISIO 2003 在线更新的注册文件
数学基础(一)矩阵对角化、SVD分解以及应用
lua --- 基本语法学习









