当前位置:网站首页>flink 学习(十二)Allowed Lateness和 Side Output
flink 学习(十二)Allowed Lateness和 Side Output
2022-04-23 17:26:00 【_lrs】
前言
在使用事件时间进行窗口操作时,事件达到的时机可能会出现延迟的情况。某个窗口的第一个事件到来时,会开启新的窗口,上一个窗口会在关闭或者在WaterMaker设置的延迟到达时关闭,窗口关闭后,后续处于这个窗口的事件将不会被处理。
而Allowed Lateness可以延迟销毁窗口,一定时间内延迟销毁窗口。Side Output作为侧输出流,能够收集延迟后仍然没有被处理的事件。
一、Allowed Lateness
如果设置了WaterMaker和Allowed Lateness,在某个窗口WaterMaker到来的时候,加上Allowed Lateness允许的延迟,延迟时间内的事件仍然会添加到窗口中,在Allowed Lateness延迟到来的时候才会销毁窗口。事件时间使用EventTimeTrigger触发器,Allowed Lateness延迟范围内的元素会再次触发窗口操作。
1.示例
@Test
public void allowedLatenessTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
source.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
})
.print("allowedLateness")
;
env.execute("allowedLatenessTest");
}
2.测试
(1)输入数据0,10,12
当输入数据12的时候,到达水位线,即到达时间间隔10s和WaterMaker2s,此时处理0~10之间的数据
(2)输入14,4
第一个窗口会重新计算,打印出0和4
(3)输入16,6
第一个窗口同样会重新计算,打印出0、4和6
(3)输入17,7
第一个窗口会在输入17时关闭,这个时候到达了时间间隔 10s 和WaterMaker 2s 以及设置的allowedLateness 5s,则输入的7不再处理。
二、Side Output
设置WaterMaker延迟了窗口的触发时机,延迟到来时触发一次窗口操作;
设置AllowedLateness延迟了窗口的关闭时机,并且延迟返回内的数据会重新多次触发窗口的操作;
在这双重延迟时间都到来时,窗口会关闭,后续来的该窗口内的数据不会再被处理。
侧输出流提供了这样一种策略,能够收集不再被处理的数据,类似于java线程池线程和队列都满了的情况下触发拒绝策略。
1.示例
@Test
public void sideOutputTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
//创建tag
OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));
SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
//设置侧输出流
.sideOutputLateData(delayStream)
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
});
source.print("处理的数据");
DataStream<Long> sideOutput = source.getSideOutput(delayStream);
sideOutput.print("未被处理的数据");
env.execute("sideOutputTest");
}
2.测试
(1)测试流程跟上面一样,可以看出,没有被处理的数据 7 被存放在了侧数据流
(2)再输入18,8
没有被处理的数据不会重复输入到侧输出流
版权声明
本文为[_lrs]所创,转载请带上原文链接,感谢
https://blog.csdn.net/RenshenLi/article/details/124333471
边栏推荐
- ASP. Net core dependency injection service life cycle
- Go language, array, string, slice
- [ES6] promise related (event loop, macro / micro task, promise, await / await)
- Collection of common SQL statements
- 双闭环直流调速系统matlab/simulink仿真
- Summary of common websites
- ECMAScript history
- If you start from zero according to the frame
- Error in v-on handler: "typeerror: cannot read property 'resetfields' of undefined"
- [C#] 彻底搞明白深拷贝
猜你喜欢
Detailed explanation of C webpai route
Customize my_ Strcpy and library strcpy [analog implementation of string related functions]
C# Task. Delay and thread The difference between sleep
Webapi + form form upload file
Compare the performance of query based on the number of paging data that meet the query conditions
常用SQL语句总结
Bottom processing of stack memory in browser
Shell script -- shell programming specification and variables
Collection of common SQL statements
线性代数感悟之2
随机推荐
Using quartz under. Net core -- a simple trigger of [7] operation and trigger
Further study of data visualization
Promise (IV)
[registration] tf54: engineer growth map and excellent R & D organization building
Shell-sort命令的使用
Manually implement call, apply and bind functions
The system cannot be started after AHCI is enabled
Go language, array, string, slice
Generating access keys using JSON webtoken
ClickHouse-SQL 操作
Solution architect's small bag - 5 types of architecture diagrams
Using quartz under. Net core -- operation transfer parameters of [3] operation and trigger
Detailed explanation of C webpai route
First knowledge of go language
node中,如何手动实现触发垃圾回收机制
ECMAScript history
Some problems encountered in recent programming 2021 / 9 / 8
Using quartz under. Net core - [1] quick start
Router object, route object, declarative navigation, programmed navigation
【生活中的逻辑谬误】稻草人谬误和无力反驳不算证明