ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

增量聚合和全窗口函数的结合使用

2022-07-04 13:02:58  阅读:169  来源: 互联网

标签:窗口 函数 url Long 增量 聚合 public


增量聚合和全窗口函数的对比

已经了解了Window API中两类窗口函数的用法,下面先来做个简单的总结。增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

 // ReduceFunction与WindowFunction结合
 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) 
 // ReduceFunction与ProcessWindowFunction结合
 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T>  reduceFunction,  ProcessWindowFunction<T,  R,  K,  W> function)
 // AggregateFunction与WindowFunction结合
 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,  ACC,  V>  aggFunction,  WindowFunction<V,  R,  K,  W> windowFunction)
 // AggregateFunction与ProcessWindowFunction结合
 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction,ProcessWindowFunction<V, R, K, W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。下面举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的url,前提是得到每个链接的“热门度”。一般情况下,可以用url的浏览量(点击量)表示热门度。我们这里统计10秒钟的url浏览量,每5秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

准备包装类

public class UrlViewCount {
    public String url;
    public Long cnt;
    public Long start;
    public Long end;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long cnt, Long start, Long end) {
        this.url = url;
        this.cnt = cnt;
        this.start = start;
        this.end = end;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", cnt=" + cnt +
                ", TimestampStart=" + new Timestamp(start) +
                ", TimestampEnd=" + new Timestamp(end) +
                '}';
    }
}

实现逻辑

/**
 * 使用增量聚合函数计算 PV
 * 使用全窗口函数包装数据
 */
public class UVCountExample0627 {
    public static void main(String[] args) throws Exception {
        //获取执行环境&设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //获取数据&提取时间戳
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
        );
        //按 url key
        eventSingleOutputStreamOperator.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //AggregateFunction 进行增量聚聚合
                .aggregate(new AggregateFunction<Event, Long, Long>() {
                               //增量聚合逻辑
                               @Override
                               public Long createAccumulator() {
                                   //初始值
                                   return 0L;
                               }

                               @Override
                               public Long add(Event value, Long accumulator) {
                                   //累加规则
                                   return accumulator + 1L;
                               }

                               @Override
                               public Long getResult(Long accumulator) {
                                   //获取结果
                                   return accumulator;
                               }

                               @Override
                               public Long merge(Long a, Long b) {
                                   //合并
                                   return a + b;
                               }
                           }, //使用全窗口函数包装数据
                        new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>() {
                            /**
                             * new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>
                             *     Long 输入数据类型
                             *     UrlViewCount 输出黄数据类型
                             *     String key 类型:data -> data.url ;url String
                             *     TimeWindow 需要一个这样的变量
                             */

                            @Override
                            public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
                                /**
                                 * String url key
                                 * Context context 上下文
                                 *  Iterable<Long> elements 数据
                                 *  Collector<UrlViewCount> out 搜集器
                                 */
                                long end = context.window().getEnd();
                                long start = context.window().getStart();
                                Long next = elements.iterator().next();
                                out.collect(new UrlViewCount(url, next, start, end));
                            }
                        }).print();

        //执行
        env.execute();
    }
}

代码中用一个AggregateFunction来实现增量聚合,每来一个数据就计数加一;得到的结果交给ProcessWindowFunction,结合窗口信息包装成我们想要的UrlViewCount,最终输出统计结果。注:ProcessWindowFunction是处理函数中的一种,这里只用它来将增量聚合函数的输出结果包裹一层窗口信息。窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

标签:窗口,函数,url,Long,增量,聚合,public
来源: https://www.cnblogs.com/wdh01/p/16435000.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有