ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink基石----Window

2022-03-21 22:02:41  阅读:153  来源: 互联网

标签:flink 窗口 String val Flink ---- Window env


Flink基石----Window

目录

Flink中的Window包含三部分:

1、Time Window----时间窗口

2、Session Window----会话窗口(待没有数据的时候开始计算)

3、Count Window----统计窗口(每n条数据计算一次)

image

一、Time Window----时间窗口

时间窗口包含四部分:

TumblingProcessingTimeWindows:滚动的处理时间窗口
TumblingEventTimeWindows:滚动的事件时间窗口(需要设置时间字段和水位线)
SlidingProcessingTimeWindows: 滑动的处理时间窗口(滑动窗口需要指定窗口大小和滑动时间)
SlidingEventTimeWindows:滑动的事件时间窗口(滑动窗口需要指定窗口大小和滑动时间)
滚动:两个时间窗口之间没有交叉;  滑动:两个时间窗口之间有交叉
1、TumblingProcessingTimeWindows----滚动的处理时间窗口
package com.shujia.flink.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //读取socket数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //拆分、转成kv格式
    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

      /**
      * 滚动的处理时间窗口
      * .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      * 简写:
      *    .timeWindow(Time.seconds(5))
      */
    //将单词分组,添加时间、并统计数量,打印
    kvDS.keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
      .print()

    env.execute()
  }
}
2、TumblingEventTimeWindows----滚动的事件时间窗口

滚动的事件时间窗口:需要设置时间字段和水位线

package com.shujia.flink.window

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //读取socket数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //拆分、转成kv格式
    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    //设置时间字段, 水位线默认等于最新数据的时间戳,水位线只增加不减少
    val assDS: DataStream[(String, Int)] = kvDS.assignTimestampsAndWatermarks(
      //执行水位线前移的时间
      new BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(5)) {
        //指定时间戳字段
        override def extractTimestamp(element: (String, Int)): Int = element._2
      }
    )

    //将单词分组,添加时间、并统计数量,打印
    kvDS.keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))//上面那一行是本行的简写
      .sum(1)
      .print()

    env.execute()

  }
}
3、SlidingProcessingTimeWindows:----滑动的处理时间窗口

滑动窗口需要指定窗口大小和滑动时间

package com.shujia.flink.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //读取socket数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //拆分、转成kv格式
    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    //将单词分组,添加时间、并统计数量,打印
    kvDS.keyBy(_._1)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)))
      .sum(1)
      .print()

    env.execute()
  }
}

二、Session Window----会话窗口

待没有数据的时候开始计算,将前面的数据放到一个窗口中进行计算,每一个key是独立计时的

会话窗口包含两种:

ProcessingTimeSessionWindows: 处理时间的会话窗口
EventTimeSessionWindows: 事件时间的会话窗口(需要设置时间字段和水位线)
1、ProcessingTimeSessionWindows---- 处理时间的会话窗口
package com.shujia.flink.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //读取socket数据
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //拆分、转成kv格式
    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    //将单词分组,添加时间、并统计数量,打印
    kvDS.keyBy(_._1)
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
      //当间隔5秒后,没有数据传入,那么开始计算
      .sum(1)
      .print()

    env.execute()
  }
}
2、EventTimeSessionWindows: 事件时间的会话窗口

需要设置时间字段和水位线

package com.shujia.flink.window

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object Demo2SessionWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //当数据量比较小时,将并行度设置为1
    env.setParallelism(1)
	//设置时间模式为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //设置水位线和时间字段
    val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
      //执行水位线前移的时间
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        //指定时间戳字段
        override def extractTimestamp(element: (String, Long)): Long = element._2
      }
    )

    assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .sum(1)
      .print()

    env.execute()
  }
}

三、Count Window----统计窗口

package com.shujia.flink.window

import org.apache.flink.streaming.api.scala._

object Demo3CountWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    /**
      * 滚动的统计窗口
      * 滑动的统计窗口
      *
      */
    kvDS
      .keyBy(_._1)
      .countWindow(10)//滚动的统计窗口---每隔10条数据计算一次
      .countWindow(10, 2) //每隔两条数据将最近的10条数据放到一个窗口中进行计算
      .sum(1)
      .print()

    env.execute()
  }
}

标签:flink,窗口,String,val,Flink,----,Window,env
来源: https://www.cnblogs.com/saowei/p/16036870.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有