当前位置:网站首页>Allowed latency and side output
Allowed latency and side output
2022-04-23 17:27:00 【_ lrs】
Preface
When using event time for window operation , The timing of the event may be delayed . When the first event of a window comes , Will open a new window , The previous window will be closed or in WaterMaker Turn off when the set delay arrives , When the window is closed , Subsequent events in this window will not be processed .
and Allowed Lateness You can delay the destruction of windows , Delay the destruction window for a certain period of time .Side Output As a side output stream , Be able to collect events that have not been processed after delay .
One 、Allowed Lateness
If set WaterMaker and Allowed Lateness, In a window WaterMaker When it comes , add Allowed Lateness Allowable delay , Events within the delay time are still added to the window , stay Allowed Lateness The window will be destroyed when the delay comes . Event time usage EventTimeTrigger trigger ,Allowed Lateness Elements within the delay range will trigger the window operation again .
1. Example
@Test
public void allowedLatenessTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
source.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
// Set timestamp and watermark , Allow data 2s Delay
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
// Scrolling window based on event time , The time interval 10 millisecond
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
// Set allowed data 5s Delay
.allowedLateness(Time.milliseconds(5))
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println(" Elements : " + next);
}
out.collect(last);
}
})
.print("allowedLateness")
;
env.execute("allowedLatenessTest");
}
2. test
(1) input data 0,10,12
When entering data 12 When , Reach the water mark , That is, the arrival time interval 10s and WaterMaker2s, Now deal with 0~10 Data between
(2) Input 14,4
The first window recalculates , Print out 0 and 4
(3) Input 16,6
The first window will also recalculate , Print out 0、4 and 6
(3) Input 17,7
The first window will enter 17 Turn off , This time the time interval is reached 10s and WaterMaker 2s And the settings allowedLateness 5s, Then, enter 7 No longer handle .
Two 、Side Output
Set up WaterMaker Delay the trigger time of the window , Trigger a window operation when the delay arrives ;
Set up AllowedLateness Delayed the closing time of the window , And the data in the delayed return will trigger the operation of the window again many times ;
When the double delay time comes , The window closes , The subsequent data in this window will not be processed .
The side output stream provides such a strategy , Able to collect data that is no longer processed , Be similar to java When the thread pool, thread and queue are full, the reject policy is triggered .
1. Example
@Test
public void sideOutputTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
// establish tag
OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));
SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
// Set timestamp and watermark , Allow data 2s Delay
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
// Scrolling window based on event time , The time interval 10 millisecond
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
// Set allowed data 5s Delay
.allowedLateness(Time.milliseconds(5))
// Set the side output stream
.sideOutputLateData(delayStream)
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println(" Elements : " + next);
}
out.collect(last);
}
});
source.print(" Data processed ");
DataStream<Long> sideOutput = source.getSideOutput(delayStream);
sideOutput.print(" Unprocessed data ");
env.execute("sideOutputTest");
}
2. test
(1) The test process is the same as above , It can be seen that , Unprocessed data 7 Stored in the side data stream
(2) Input again 18,8
The unprocessed data will not be repeatedly input to the side output stream
版权声明
本文为[_ lrs]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231726366501.html
边栏推荐
- Promise (III)
- Use of shell cut command
- PC uses wireless network card to connect to mobile phone hotspot. Why can't you surf the Internet
- Shell-sed命令的使用
- Generation of barcode and QR code
- The system cannot be started after AHCI is enabled
- Oninput one function to control multiple oninputs (take the contents of this input box as parameters) [very practical, very practical]
- Baidu Map Case - Zoom component, map scale component
- 练习:求偶数和、阈值分割和求差( list 对象的两个基础小题)
- Using quartz under. Net core -- operation transfer parameters of [3] operation and trigger
猜你喜欢
随机推荐
How to change input into text
Bottom processing of stack memory in browser
XTask与Kotlin Coroutine的使用对比
Devexpress GridView add select all columns
Document operation II (5000 word summary)
Using quartz under. Net core -- preliminary understanding of [2] operations and triggers
Manually implement call, apply and bind functions
STM32 entry development board choose wildfire or punctual atom?
ECMAScript history
ASP. Net core dependency injection service life cycle
[markdown notes]
01-初识sketch-sketch优势
Wiper component encapsulation
1-1 NodeJS
Using quartz under. Net core - calendar of [6] jobs and triggers
freeCodeCamp----prob_ Calculator exercise
If you start from zero according to the frame
stm32入门开发板选野火还是正点原子呢?
El date picker limits the selection range from the current time to two months ago
Oninput one function to control multiple oninputs (take the contents of this input box as parameters) [very practical, very practical]