当前位置:网站首页>flink 学习(十二)Allowed Lateness和 Side Output
flink 学习(十二)Allowed Lateness和 Side Output
2022-04-23 17:26:00 【_lrs】
前言
在使用事件时间进行窗口操作时,事件达到的时机可能会出现延迟的情况。某个窗口的第一个事件到来时,会开启新的窗口,上一个窗口会在关闭或者在WaterMaker设置的延迟到达时关闭,窗口关闭后,后续处于这个窗口的事件将不会被处理。
而Allowed Lateness可以延迟销毁窗口,一定时间内延迟销毁窗口。Side Output作为侧输出流,能够收集延迟后仍然没有被处理的事件。
一、Allowed Lateness
如果设置了WaterMaker和Allowed Lateness,在某个窗口WaterMaker到来的时候,加上Allowed Lateness允许的延迟,延迟时间内的事件仍然会添加到窗口中,在Allowed Lateness延迟到来的时候才会销毁窗口。事件时间使用EventTimeTrigger触发器,Allowed Lateness延迟范围内的元素会再次触发窗口操作。
1.示例
@Test
public void allowedLatenessTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
source.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
})
.print("allowedLateness")
;
env.execute("allowedLatenessTest");
}
2.测试
(1)输入数据0,10,12
当输入数据12的时候,到达水位线,即到达时间间隔10s和WaterMaker2s,此时处理0~10之间的数据

(2)输入14,4
第一个窗口会重新计算,打印出0和4

(3)输入16,6
第一个窗口同样会重新计算,打印出0、4和6

(3)输入17,7
第一个窗口会在输入17时关闭,这个时候到达了时间间隔 10s 和WaterMaker 2s 以及设置的allowedLateness 5s,则输入的7不再处理。

二、Side Output
设置WaterMaker延迟了窗口的触发时机,延迟到来时触发一次窗口操作;
设置AllowedLateness延迟了窗口的关闭时机,并且延迟返回内的数据会重新多次触发窗口的操作;
在这双重延迟时间都到来时,窗口会关闭,后续来的该窗口内的数据不会再被处理。
侧输出流提供了这样一种策略,能够收集不再被处理的数据,类似于java线程池线程和队列都满了的情况下触发拒绝策略。
1.示例
@Test
public void sideOutputTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
//创建tag
OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));
SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
//设置侧输出流
.sideOutputLateData(delayStream)
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
});
source.print("处理的数据");
DataStream<Long> sideOutput = source.getSideOutput(delayStream);
sideOutput.print("未被处理的数据");
env.execute("sideOutputTest");
}
2.测试
(1)测试流程跟上面一样,可以看出,没有被处理的数据 7 被存放在了侧数据流

(2)再输入18,8
没有被处理的数据不会重复输入到侧输出流

版权声明
本文为[_lrs]所创,转载请带上原文链接,感谢
https://blog.csdn.net/RenshenLi/article/details/124333471
边栏推荐
- Go language, array, string, slice
- VsCode-Go
- Manually implement simple promise and its basic functions
- First knowledge of go language
- Some problems encountered in recent programming 2021 / 9 / 8
- C语言函数详解
- Using quartz under. Net core -- a simple trigger of [7] operation and trigger
- ClickHouse-表引擎
- [markdown notes]
- Further study of data visualization
猜你喜欢

groutine

1-4 configuration executable script of nodejs installation
![[ES6] promise related (event loop, macro / micro task, promise, await / await)](/img/69/ea3ef6063d373f116a44c53565daa3.png)
[ES6] promise related (event loop, macro / micro task, promise, await / await)

线性代数感悟之2

01-初识sketch-sketch优势

On lambda powertools typescript

ClickHouse-表引擎

SiteServer CMS5. 0 Usage Summary

.Net Core3. 1 use razorengine NETCORE production entity generator (MVC web version)

Shell script -- shell programming specification and variables
随机推荐
In ancient Egypt and Greece, what base system was used in mathematics
Manually implement simple promise and its basic functions
VsCode-Go
MySQL installation
Self use learning notes - connected and non connected access to database
Collect blog posts
Shell-cut命令的使用
Flash project cross domain interception and DBM database learning [Baotou cultural and creative website development]
基于51单片机红外无线通讯仿真
2. Electron's HelloWorld
Using quartz under. Net core -- a simple trigger of [7] operation and trigger
C语言函数详解
Go language RPC communication
XTask与Kotlin Coroutine的使用对比
Shell-awk命令的使用
嵌入式系统中,FLASH中的程序代码必须搬到RAM中运行吗?
1-2 characteristics of nodejs
Change Oracle to MySQL
Websocket (basic)
matlab如何绘制已知公式的曲线图,Excel怎么绘制函数曲线图像?