当前位置:网站首页>flink 学习(十二)Allowed Lateness和 Side Output

flink 学习(十二)Allowed Lateness和 Side Output

2022-04-23 17:26:00 _lrs


前言

        在使用事件时间进行窗口操作时,事件达到的时机可能会出现延迟的情况。某个窗口的第一个事件到来时,会开启新的窗口,上一个窗口会在关闭或者在WaterMaker设置的延迟到达时关闭,窗口关闭后,后续处于这个窗口的事件将不会被处理。
        而Allowed Lateness可以延迟销毁窗口,一定时间内延迟销毁窗口。Side Output作为侧输出流,能够收集延迟后仍然没有被处理的事件。

一、Allowed Lateness

        如果设置了WaterMaker和Allowed Lateness,在某个窗口WaterMaker到来的时候,加上Allowed Lateness允许的延迟,延迟时间内的事件仍然会添加到窗口中,在Allowed Lateness延迟到来的时候才会销毁窗口。事件时间使用EventTimeTrigger触发器,Allowed Lateness延迟范围内的元素会再次触发窗口操作。

1.示例

    @Test
    public void allowedLatenessTest() throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("172.16.10.159", 8888);
        source.map(new MapFunction<String, Long>() {
    
            @Override
            public Long map(String value) throws Exception {
    
                return Long.parseLong(value);
            }
        })
                //设置时间戳和水印,允许数据2s延迟
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
                //基于事件时间的滚动窗口,时间间隔10毫秒
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                //设置允许数据5s延迟
                .allowedLateness(Time.milliseconds(5))
                .process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
    
                    @Override
                    public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
    
                        Iterator<Long> it = elements.iterator();

                        Long last = null;
                        while (it.hasNext()) {
    
                            Long next = it.next();
                            last = next;
                            System.out.println("元素: " + next);
                        }
                        out.collect(last);
                    }
                })
                .print("allowedLateness")
        ;
        env.execute("allowedLatenessTest");
    }

2.测试

(1)输入数据0,10,12

当输入数据12的时候,到达水位线,即到达时间间隔10s和WaterMaker2s,此时处理0~10之间的数据

在这里插入图片描述
(2)输入14,4

第一个窗口会重新计算,打印出0和4
在这里插入图片描述
(3)输入16,6

第一个窗口同样会重新计算,打印出0、4和6
在这里插入图片描述

(3)输入17,7

第一个窗口会在输入17时关闭,这个时候到达了时间间隔 10s 和WaterMaker 2s 以及设置的allowedLateness 5s,则输入的7不再处理。

在这里插入图片描述

二、Side Output

设置WaterMaker延迟了窗口的触发时机,延迟到来时触发一次窗口操作;
设置AllowedLateness延迟了窗口的关闭时机,并且延迟返回内的数据会重新多次触发窗口的操作;
在这双重延迟时间都到来时,窗口会关闭,后续来的该窗口内的数据不会再被处理。
侧输出流提供了这样一种策略,能够收集不再被处理的数据,类似于java线程池线程和队列都满了的情况下触发拒绝策略。

1.示例

    @Test
    public void sideOutputTest() throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);
        //创建tag
        OutputTag<Long> delayStream = new OutputTag<Long>("delayStream", TypeInformation.of(Long.class));

        SingleOutputStreamOperator<Long> source = env.socketTextStream("172.16.10.159", 8888)
                .map(new MapFunction<String, Long>() {
    
                    @Override
                    public Long map(String value) throws Exception {
    
                        return Long.parseLong(value);
                    }
                })
                //设置时间戳和水印,允许数据2s延迟
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofMillis(2)).withTimestampAssigner((element, recordTimestamp) -> element))
                //基于事件时间的滚动窗口,时间间隔10毫秒
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)))
                //设置允许数据5s延迟
                .allowedLateness(Time.milliseconds(5))
                //设置侧输出流
                .sideOutputLateData(delayStream)
                .process(new ProcessAllWindowFunction<Long, Long, TimeWindow>() {
    
                    @Override
                    public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
    
                        Iterator<Long> it = elements.iterator();

                        Long last = null;
                        while (it.hasNext()) {
    
                            Long next = it.next();
                            last = next;
                            System.out.println("元素: " + next);
                        }
                        out.collect(last);
                    }
                });

        source.print("处理的数据");
        DataStream<Long> sideOutput = source.getSideOutput(delayStream);
        sideOutput.print("未被处理的数据");
        env.execute("sideOutputTest");
    }

2.测试

(1)测试流程跟上面一样,可以看出,没有被处理的数据 7 被存放在了侧数据流

在这里插入图片描述
(2)再输入18,8

没有被处理的数据不会重复输入到侧输出流
在这里插入图片描述

版权声明
本文为[_lrs]所创,转载请带上原文链接,感谢
https://blog.csdn.net/RenshenLi/article/details/124333471