当前位置:网站首页>flink 学习(十二)Allowed Lateness和 Side Output
flink 学习(十二)Allowed Lateness和 Side Output
2022-04-23 17:26:00 【_lrs】
前言
在使用事件时间进行窗口操作时,事件达到的时机可能会出现延迟的情况。某个窗口的第一个事件到来时,会开启新的窗口,上一个窗口会在关闭或者在WaterMaker设置的延迟到达时关闭,窗口关闭后,后续处于这个窗口的事件将不会被处理。
而Allowed Lateness可以延迟销毁窗口,一定时间内延迟销毁窗口。Side Output作为侧输出流,能够收集延迟后仍然没有被处理的事件。
一、Allowed Lateness
如果设置了WaterMaker和Allowed Lateness,在某个窗口WaterMaker到来的时候,加上Allowed Lateness允许的延迟,延迟时间内的事件仍然会添加到窗口中,在Allowed Lateness延迟到来的时候才会销毁窗口。事件时间使用EventTimeTrigger触发器,Allowed Lateness延迟范围内的元素会再次触发窗口操作。
1.示例
@Test
public void allowedLatenessTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
source.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
})
.print("allowedLateness")
;
env.execute("allowedLatenessTest");
}
2.测试
(1)输入数据0,10,12
当输入数据12的时候,到达水位线,即到达时间间隔10s和WaterMaker2s,此时处理0~10之间的数据
(2)输入14,4
第一个窗口会重新计算,打印出0和4
(3)输入16,6
第一个窗口同样会重新计算,打印出0、4和6
(3)输入17,7
第一个窗口会在输入17时关闭,这个时候到达了时间间隔 10s 和WaterMaker 2s 以及设置的allowedLateness 5s,则输入的7不再处理。
二、Side Output
设置WaterMaker延迟了窗口的触发时机,延迟到来时触发一次窗口操作;
设置AllowedLateness延迟了窗口的关闭时机,并且延迟返回内的数据会重新多次触发窗口的操作;
在这双重延迟时间都到来时,窗口会关闭,后续来的该窗口内的数据不会再被处理。
侧输出流提供了这样一种策略,能够收集不再被处理的数据,类似于java线程池线程和队列都满了的情况下触发拒绝策略。
1.示例
@Test
public void sideOutputTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
//创建tag
OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));
SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
//设置时间戳和水印,允许数据2s延迟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
//基于事件时间的滚动窗口,时间间隔10毫秒
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
//设置允许数据5s延迟
.allowedLateness(Time.milliseconds(5))
//设置侧输出流
.sideOutputLateData(delayStream)
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println("元素: " + next);
}
out.collect(last);
}
});
source.print("处理的数据");
DataStream<Long> sideOutput = source.getSideOutput(delayStream);
sideOutput.print("未被处理的数据");
env.execute("sideOutputTest");
}
2.测试
(1)测试流程跟上面一样,可以看出,没有被处理的数据 7 被存放在了侧数据流
(2)再输入18,8
没有被处理的数据不会重复输入到侧输出流
版权声明
本文为[_lrs]所创,转载请带上原文链接,感谢
https://blog.csdn.net/RenshenLi/article/details/124333471
边栏推荐
- 【WPF绑定3】 ListView基础绑定和数据模板绑定
- Promise (II)
- Use of Shell sort command
- Aiot industrial technology panoramic structure - Digital Architecture Design (8)
- Node template engine (EJS, art template)
- node中,如何手动实现触发垃圾回收机制
- ASP. Net core dependency injection service life cycle
- JSON deserialize anonymous array / object
- Handwritten event publish subscribe framework
- Oninput one function to control multiple oninputs (take the contents of this input box as parameters) [very practical, very practical]
猜你喜欢
Bottom processing of stack memory in browser
Simulation of infrared wireless communication based on 51 single chip microcomputer
Compare the performance of query based on the number of paging data that meet the query conditions
.Net Core3. 1 use razorengine NETCORE production entity generator (MVC web version)
01-初识sketch-sketch优势
Signalr can actively send data from the server to the client
Further study of data visualization
ClickHouse-表引擎
groutine
EF core in ASP Generate core priority database based on net entity model
随机推荐
Use of todesk remote control software
Advantages and disadvantages of several note taking software
node中,如何手动实现触发垃圾回收机制
Manually implement simple promise and its basic functions
In ancient Egypt and Greece, what base system was used in mathematics
Document operation II (5000 word summary)
双闭环直流调速系统matlab/simulink仿真
手写事件发布订阅框架
Grpc gateway based on Ocelot
Manually implement call, apply and bind functions
Baidu Map Case - Zoom component, map scale component
Baidu Map 3D rotation and tilt angle adjustment
Generation of barcode and QR code
Using quartz under. Net core - calendar of [6] jobs and triggers
Change Oracle to MySQL
On lambda powertools typescript
JS to find the character that appears three times in the string
JS, entries(), keys(), values(), some(), object Assign() traversal array usage
C语言程序设计之函数的构造
stm32入门开发板选野火还是正点原子呢?