当前位置:网站首页>Allowed latency and side output

Allowed latency and side output

2022-04-23 17:27:00 _ lrs


Preface

         When using event time for window operation , The timing of the event may be delayed . When the first event of a window comes , Will open a new window , The previous window will be closed or in WaterMaker Turn off when the set delay arrives , When the window is closed , Subsequent events in this window will not be processed .
         and Allowed Lateness You can delay the destruction of windows , Delay the destruction window for a certain period of time .Side Output As a side output stream , Be able to collect events that have not been processed after delay .

One 、Allowed Lateness

         If set WaterMaker and Allowed Lateness, In a window WaterMaker When it comes , add Allowed Lateness Allowable delay , Events within the delay time are still added to the window , stay Allowed Lateness The window will be destroyed when the delay comes . Event time usage EventTimeTrigger trigger ,Allowed Lateness Elements within the delay range will trigger the window operation again .

1. Example

    @Test
    public void allowedLatenessTest() throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
        source.map(new MapFunction<String, Long>() {
    
            @Override
            public Long map(String value) throws Exception {
    
                return Long.parseLong(value);
            }
        })
                // Set timestamp and watermark , Allow data 2s Delay 
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
                // Scrolling window based on event time , The time interval 10 millisecond 
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                // Set allowed data 5s Delay 
                .allowedLateness(Time.milliseconds(5))
                .process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
    
                    @Override
                    public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
    
                        Iterator<Long> it = elements.iterator();

                        Long last = null;
                        while (it.hasNext()) {
    
                            Long next = it.next();
                            last = next;
                            System.out.println(" Elements : " + next);
                        }
                        out.collect(last);
                    }
                })
                .print("allowedLateness")
        ;
        env.execute("allowedLatenessTest");
    }

2. test

(1) input data 0,10,12

When entering data 12 When , Reach the water mark , That is, the arrival time interval 10s and WaterMaker2s, Now deal with 0~10 Data between

 Insert picture description here
(2) Input 14,4

The first window recalculates , Print out 0 and 4
 Insert picture description here
(3) Input 16,6

The first window will also recalculate , Print out 0、4 and 6
 Insert picture description here

(3) Input 17,7

The first window will enter 17 Turn off , This time the time interval is reached 10s and WaterMaker 2s And the settings allowedLateness 5s, Then, enter 7 No longer handle .

 Insert picture description here

Two 、Side Output

Set up WaterMaker Delay the trigger time of the window , Trigger a window operation when the delay arrives ;
Set up AllowedLateness Delayed the closing time of the window , And the data in the delayed return will trigger the operation of the window again many times ;
When the double delay time comes , The window closes , The subsequent data in this window will not be processed .
The side output stream provides such a strategy , Able to collect data that is no longer processed , Be similar to java When the thread pool, thread and queue are full, the reject policy is triggered .

1. Example

    @Test
    public void sideOutputTest() throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        // establish tag
        OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));

        SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction<String, Long>() {
    
                    @Override
                    public Long map(String value) throws Exception {
    
                        return Long.parseLong(value);
                    }
                })
                // Set timestamp and watermark , Allow data 2s Delay 
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
                // Scrolling window based on event time , The time interval 10 millisecond 
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                // Set allowed data 5s Delay 
                .allowedLateness(Time.milliseconds(5))
                // Set the side output stream 
                .sideOutputLateData(delayStream)
                .process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
    
                    @Override
                    public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
    
                        Iterator<Long> it = elements.iterator();

                        Long last = null;
                        while (it.hasNext()) {
    
                            Long next = it.next();
                            last = next;
                            System.out.println(" Elements : " + next);
                        }
                        out.collect(last);
                    }
                });

        source.print(" Data processed ");
        DataStream<Long> sideOutput = source.getSideOutput(delayStream);
        sideOutput.print(" Unprocessed data ");
        env.execute("sideOutputTest");
    }

2. test

(1) The test process is the same as above , It can be seen that , Unprocessed data 7 Stored in the side data stream

 Insert picture description here
(2) Input again 18,8

The unprocessed data will not be repeatedly input to the side output stream
 Insert picture description here

版权声明
本文为[_ lrs]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231726366501.html