ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink 基石、Flink Time、事件时间、Watermark水位线

2022-03-20 22:02:06  阅读:167  来源: 互联网

标签:String val Watermark Flink 001 时间 Time


Flink 基石、Flink Time、事件时间、Watermark水位线

目录

img

img

事件时间

代码示例

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    用户id,事件时间
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件时间划分窗口
      * 1、设置事件模式为事件时间
      * 2、指定时间字段
      */

    /**
      * 每隔5秒统计用户出现的次数
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //这里需要将并行度设置为1
    //因为这里存在一个时间戳对齐的问题,多并行度的时候会对不齐
    //不会触发事件时间的计算
    env.setParallelism(1)

    //设置时间模式
    //默认是处理时间
    //TimeCharacteristic.EventTime -- 事件时间
    //TimeCharacteristic.IngestionTime -- 接收时间
    //TimeCharacteristic.ProcessingTime -- 处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //设置时间字段
    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    /**
      * 事件时间窗口触发条件
      * 1、窗口内有数据
      * 2、最新数据的事件时间大于等于窗口的结束数据的时间
      * 但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?

窗口如果被计算了,之后再来一条属于这个窗口的数据会丢数据

Watermark

水位线

img

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件事件划分窗口
      * 1、设置事件模式为事件时间
      * 2、指定时间字段
      */

    /**
      * 每隔5秒统计用户出现的次数
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //设置时间模式
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //设置时间字段,水位线默认等于最新数据的时间戳,水位线只增加不减少
    //    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    //设置水位线和时间字段
    val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
      //执行水位线前移的时间
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        //指定时间戳字段
        override def extractTimestamp(element: (String, Long)): Long = element._2
      }
    )

    /**
      * 事件时间窗口触发条件
      * 1、窗口内有数据
      * 2、最新数据的时间大于等于窗口的结束数据
      *
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

学习一个新框架,会看官网很重要

标签:String,val,Watermark,Flink,001,时间,Time
来源: https://www.cnblogs.com/saowei/p/16032325.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有