当前位置:网站首页>Allowed latency and side output
Allowed latency and side output
2022-04-23 17:27:00 【_ lrs】
Preface
When using event time for window operation , The timing of the event may be delayed . When the first event of a window comes , Will open a new window , The previous window will be closed or in WaterMaker Turn off when the set delay arrives , When the window is closed , Subsequent events in this window will not be processed .
and Allowed Lateness You can delay the destruction of windows , Delay the destruction window for a certain period of time .Side Output As a side output stream , Be able to collect events that have not been processed after delay .
One 、Allowed Lateness
If set WaterMaker and Allowed Lateness, In a window WaterMaker When it comes , add Allowed Lateness Allowable delay , Events within the delay time are still added to the window , stay Allowed Lateness The window will be destroyed when the delay comes . Event time usage EventTimeTrigger trigger ,Allowed Lateness Elements within the delay range will trigger the window operation again .
1. Example
@Test
public void allowedLatenessTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
source.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
// Set timestamp and watermark , Allow data 2s Delay
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
// Scrolling window based on event time , The time interval 10 millisecond
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
// Set allowed data 5s Delay
.allowedLateness(Time.milliseconds(5))
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println(" Elements : " + next);
}
out.collect(last);
}
})
.print("allowedLateness")
;
env.execute("allowedLatenessTest");
}
2. test
(1) input data 0,10,12
When entering data 12 When , Reach the water mark , That is, the arrival time interval 10s and WaterMaker2s, Now deal with 0~10 Data between

(2) Input 14,4
The first window recalculates , Print out 0 and 4

(3) Input 16,6
The first window will also recalculate , Print out 0、4 and 6

(3) Input 17,7
The first window will enter 17 Turn off , This time the time interval is reached 10s and WaterMaker 2s And the settings allowedLateness 5s, Then, enter 7 No longer handle .

Two 、Side Output
Set up WaterMaker Delay the trigger time of the window , Trigger a window operation when the delay arrives ;
Set up AllowedLateness Delayed the closing time of the window , And the data in the delayed return will trigger the operation of the window again many times ;
When the double delay time comes , The window closes , The subsequent data in this window will not be processed .
The side output stream provides such a strategy , Able to collect data that is no longer processed , Be similar to java When the thread pool, thread and queue are full, the reject policy is triggered .
1. Example
@Test
public void sideOutputTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.setParallelism(1);
// establish tag
OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));
SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
.map(new MapFunction<String, Long>() {
@Override
public Long map(String value) throws Exception {
return Long.parseLong(value);
}
})
// Set timestamp and watermark , Allow data 2s Delay
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
// Scrolling window based on event time , The time interval 10 millisecond
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
// Set allowed data 5s Delay
.allowedLateness(Time.milliseconds(5))
// Set the side output stream
.sideOutputLateData(delayStream)
.process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
@Override
public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
Iterator<Long> it = elements.iterator();
Long last = null;
while (it.hasNext()) {
Long next = it.next();
last = next;
System.out.println(" Elements : " + next);
}
out.collect(last);
}
});
source.print(" Data processed ");
DataStream<Long> sideOutput = source.getSideOutput(delayStream);
sideOutput.print(" Unprocessed data ");
env.execute("sideOutputTest");
}
2. test
(1) The test process is the same as above , It can be seen that , Unprocessed data 7 Stored in the side data stream

(2) Input again 18,8
The unprocessed data will not be repeatedly input to the side output stream

版权声明
本文为[_ lrs]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231726366501.html
边栏推荐
- Variable length parameter__ VA_ ARGS__ Macro definitions for and logging
- Use of shell sed command
- Excel quickly and automatically fills the contents of a row on a blank cell
- XTask与Kotlin Coroutine的使用对比
- Node template engine (EJS, art template)
- Using quartz under. Net core - [1] quick start
- Use of todesk remote control software
- C listens for WMI events
- Seven cattle upload pictures (foreground JS + background C API get token)
- Using quartz under. Net core -- general properties and priority of triggers for [5] jobs and triggers
猜你喜欢

Perception of linear algebra 2

基于51单片机红外无线通讯仿真

ClickHouse-表引擎

Go language, array, string, slice

Clickhouse table engine

ASP. Net core JWT certification
![[WPF binding 3] listview basic binding and data template binding](/img/2e/fbdb4175297bb4964a8ccfd0b909ae.png)
[WPF binding 3] listview basic binding and data template binding

On lambda powertools typescript

PC电脑使用无线网卡连接上手机热点,为什么不能上网

. net cross platform principle (Part I)
随机推荐
Milvus 2.0 質量保障系統詳解
El date picker limits the selection range from the current time to two months ago
Metaprogramming, proxy and reflection
[logical fallacy in life] Scarecrow fallacy and inability to refute are not proof
On lambda powertools typescript
Deep understanding of control inversion and dependency injection
matlab如何绘制已知公式的曲线图,Excel怎么绘制函数曲线图像?
1-2 JSX syntax rules
Node template engine (EJS, art template)
Clickhouse - data type
常用SQL语句总结
练习:求偶数和、阈值分割和求差( list 对象的两个基础小题)
Error in v-on handler: "typeerror: cannot read property 'resetfields' of undefined"
Baidu Map 3D rotation and tilt angle adjustment
Use of shell sed command
EF core in ASP Generate core priority database based on net entity model
Shell script -- shell programming specification and variables
New keyword learning and summary
. net type transfer
Using quartz under. Net core -- preliminary understanding of [2] operations and triggers