ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink 窗口API & 窗口分配器

2022-06-23 08:32:13  阅读:140  来源: 互联网

标签:窗口 keyBy window API 分配器 Time data


1、窗口API

1,1、按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...)
      .window(...)

非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

1.2、代码中窗口API的调用

有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

tream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2、窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。之前的介绍中我们知道,窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.1、时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。在较早的版本中,可以直接调用.timeWindow()来定义时间窗口;这种方式非常简洁,但使用事件时间语义时需要另外声明,程序员往往因为忘记这点而导致运行结果错误。所以在1.12版本之后,这种方式已经被弃用了,标准的声明方式就是直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。

环境准备

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行度
        env.setParallelism(1);
        //设置水位线生成间隔
        env.getConfig().setAutoWatermarkInterval(100);
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //无序水位线:延迟2s
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            //指定水位线的字段:这里从 timestamp 读取时间信息
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

滚动处理时间窗口

        /**
         * 滚动时间窗口
         * 窗口大小 10 min ,偏移 0
         */
        eventStream.map(data -> Tuple2.of(data.user, 1L)) 
                .keyBy(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }) ;
        /**
         * 滚动时间窗口
         * 窗口大小 10 min ,偏移 2 min
         */
        WindowedStream<Event, String, TimeWindow> window2 =
                eventStream.keyBy(data -> data.user)
                        .window(TumblingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)));

偏移量说明:我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,而这个时间是以UTC时间,也就是0时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是UTC+8,跟UTC有8小时的时差。我们定义1天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天0点开启窗口,这时是北京时间早上8点。那怎样得到北京时间每天0点开启的滚动窗口呢?只要设置-8小时的偏移量就可以了:

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

滑动处理时间窗口

        /**
         * 滑动事件时间窗口
         * 窗口大小 30 min,步长 5 min 无偏移量
         * window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)))
         */
        WindowedStream<Event, String, TimeWindow> window1 = eventStream.keyBy(data -> data.user)
                .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)));
         /**
         * 滑动事件时间窗口
         * 窗口大小 30 min,步长 5 min 偏移量 -8h
         * SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(8)
         */
        WindowedStream<Event, String, TimeWindow> window11 = eventStream.keyBy(data -> data.user)
                .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(-8)));

其它一些窗口分配器

   //事件时间会话窗口:窗口大小 5s
        WindowedStream<Event, String, TimeWindow> window3 =
                eventStream.keyBy(data -> data.user)
                        .window(EventTimeSessionWindows.withGap(Time.seconds(5)));
        //滑动计数窗口 窗口大小 10  :滑动步长 2
        WindowedStream<Event, String, GlobalWindow> eventStringGlobalWindowWindowedStream =
                eventStream.keyBy(data -> data.user)
                        .countWindow(10, 2);

标签:窗口,keyBy,window,API,分配器,Time,data
来源: https://www.cnblogs.com/wdh01/p/16391678.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有