ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink实战-模拟简易双11实时统计大屏

2021-06-01 20:30:49  阅读:183  来源: 互联网

标签:11 category 22 55 flink CategoryPojo dateTime 大屏 34


背景

在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。除了这个,还有一些其他场景的应用,比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等,其实做法都是类似的。

今天我们就做一个最简单的模拟电商统计大屏的小例子,我们抽取一下最简单的需求。

  • 实时计算出当天零点截止到当前时间的销售总额
  • 计算出各个分类的销售top3
  • 每秒钟更新一次统计结果

实例讲解

构造数据

首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成.

 /**
  * 模拟生成某一个分类下的订单生成
  */
 public static class MySource implements SourceFunction<Tuple2<String,Double>>{

  private volatile boolean isRunning = true;
  private Random random = new Random();
  String category[] = {
    "女装", "男装",
    "图书", "家电",
    "洗护", "美妆",
    "运动", "游戏",
    "户外", "家具",
    "乐器", "办公"
  };

  @Override
  public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(10);
    //某一个分类
    String c = category[(int) (Math.random() * (category.length - 1))];
    //某一个分类下产生了price的成交订单
    double price = random.nextDouble() * 100;
    ctx.collect(Tuple2.of(c, price));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }

复制代码

构造统计结果类

 public static class CategoryPojo{
  //  分类名称
  private String category;
  //  改分类总销售额
  private double totalPrice;
  //      截止到当前时间的时间
  private String dateTime;
  
     getter and setter ........
 }

复制代码

定义窗口和触发器


 DataStream<CategoryPojo> result = dataStream.keyBy(0)
                                              .window(TumblingProcessingTimeWindows.of(Time.days(
                                                1), Time.hours(-8)))
                                              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
                                                1)))
                                              .aggregate(
                                                new PriceAggregate(),
                                                new WindowResult()
                                              );

复制代码

首先我们定义一个窗口期是一天的滚动窗口,然后设置一个1秒钟的触发器,之后进行聚合计算.

集合计算

 private static class PriceAggregate
   implements AggregateFunction<Tuple2<String,Double>,Double,Double>{

  @Override
  public Double createAccumulator(){
   return 0D;
  }

  @Override
  public Double add(Tuple2<String,Double> value, Double accumulator){
   return accumulator + value.f1;
  }

  @Override
  public Double getResult(Double accumulator){
   return accumulator;
  }

  @Override
  public Double merge(Double a, Double b){
   return a + b;
  }
 }

复制代码

聚合计算也比较简单,其实就是对price的简单sum操作

收集窗口结果数据


 private static class WindowResult
   implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
  SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  @Override
  public void apply(
    Tuple key,
    TimeWindow window,
    Iterable<Double> input,
    Collector<CategoryPojo> out) throws Exception{
   CategoryPojo categoryPojo = new CategoryPojo();
   categoryPojo.setCategory(((Tuple1<String>) key).f0);

   BigDecimal bg = new BigDecimal(input.iterator().next());
   double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
   categoryPojo.setTotalPrice(p);
   categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
   out.collect(categoryPojo);
  }
 }

复制代码

我们最聚合的结果进行简单的封装,封装成CategoryPojo类以便后续处理

使用聚合窗口的结果


result.keyBy("dateTime")
        .window(TumblingProcessingTimeWindows.of(Time.seconds(
          1)))
        .process(new WindowResultProcess());

复制代码

接下来我们要使用上面聚合的结果,所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.

如何使用窗口的结果,可以参考flink的官网[1]

结果统计

接下来我们做最后的结果统计,在这里,我们会把各个分类的总价加起来,就是全站的总销量金额,然后我们同时使用优先级队列计算出分类销售的Top3,打印出结果,在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储,以供前端的实时页面展示。


 private static class WindowResultProcess
   extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{

  @Override
  public void process(

    Tuple tuple,
    Context context,
    Iterable<CategoryPojo> elements,
    Collector<Object> out) throws Exception{
   String date = ((Tuple1<String>) tuple).f0;

   Queue<CategoryPojo> queue = new PriorityQueue<>(
     3,
     (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
   double price = 0D;
   Iterator<CategoryPojo> iterator = elements.iterator();
   int s = 0;
   while (iterator.hasNext()){
    CategoryPojo categoryPojo = iterator.next();
    if (queue.size() < 3){
     queue.add(categoryPojo);
    } else {
     CategoryPojo tmp = queue.peek();
     if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
      queue.poll();
      queue.add(categoryPojo);
     }
    }
    price += categoryPojo.getTotalPrice();
   }

   List<String> list = queue.stream()
                            .sorted((o1, o2)->o1.getTotalPrice() <=
                                              o2.getTotalPrice() ? 1 : -1)
                            .map(f->"(分类:" + f.getCategory() + " 销售额:" +
                                    f.getTotalPrice() + ")")

                            .collect(
                              Collectors.toList());
   System.out.println("时间 : " + date + "  总价 : " + price + " top3 " +
                      StringUtils.join(list, ","));
   System.out.println("-------------");
  }

 }


复制代码

示例运行结果


3> CategoryPojo{category='户外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='游戏', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='洗护', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='运动', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='乐器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='图书', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='家电', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='男装', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='女装', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
1> CategoryPojo{category='美妆', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
时间 : 2020-06-13 22:55:34  总价 : 9617.050000000001 top3 (分类:家电 销售额:1225.34),(分类:女装 销售额:1018.88),(分类:图书 销售额:1010.12)

复制代码

完整的代码请参考

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java


文章来源:https://juejin.cn/post/6844904192180486158
 

标签:11,category,22,55,flink,CategoryPojo,dateTime,大屏,34
来源: https://blog.csdn.net/wwd0501/article/details/117451220

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有