ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink

2021-08-01 16:02:52  阅读:119  来源: 互联网

标签:String val flink Long org apache import


项目实例

计算最热门TopN商品

package hosItems_analysis

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//定义输入数据的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

//定义窗口聚合结果的样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)


object hosItems {
  def main(args: Array[String]): Unit = {

    //创建一个流处理执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据
    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")

    //从Kafka读取数据
//    val properties = new Properties()
//    properties.setProperty("bootstrap.servers", "192.168.1.103:9092")
//    properties.setProperty("group.id", "consumer-group")
//    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//    properties.setProperty("auto.offset.reset", "latest")
//    val inputStream:DataStream[String]=env.addSource(new FlinkKafkaConsumer[String]("hotItems",new SimpleStringSchema(),properties))



    //将数据转换成样例类类型,并且提取timestamp定义watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //读数据进行转换,过滤出pv行为,开窗聚合统计个数
    val aggStream: DataStream[ItemViewCount] = dataStream
      .filter(_.behavior == "pv") //过滤pv行为
      .keyBy("itemId")
      .timeWindow(Time.hours(1), Time.minutes(5)) //定义滑窗
      .aggregate(new CountAgg, new ItemCountWindowResult())



    //对窗口聚合结果按照窗口进行分组,并做排序取TopN输出
    val resultStream:DataStream[String]=aggStream
        .keyBy("windowEnd")
        .process(new TopNHotItems(5))

    resultStream.print()
    //aggStream.print()
    env.execute("hot items job")
  }

}
//自定义预聚合函数,来一条数据就加1
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
  override def createAccumulator(): Long = 0L//初始化,赋初值

  override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

//扩展:自定义求平均值的聚合函数,状态为(sum,count)
class AtvgAgg()extends AggregateFunction[UserBehavior,(Long,Int),Double] {
  override def createAccumulator(): (Long, Int) = (0L,0)

  override def add(value: UserBehavior, accumulator: (Long, Int)): (Long, Int) = (accumulator._1+value.timestamp,accumulator._2+1)

  override def getResult(accumulator: (Long, Int)): Double = accumulator._1/accumulator._2.toDouble

  override def merge(a: (Long, Int), b: (Long, Int)): (Long, Int) = (a._1+b._1,a._2+b._2)
}

//自定义窗口函数,结合window信息包装成样例类
class ItemCountWindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
    val windowEnd: Long = window.getEnd
    val count: Long = input.iterator.next()
    out.collect(ItemViewCount(itemId, windowEnd, count))
  }
}


//自定义KeyedProcessFunction
class TopNHotItems(n: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String]{

  //定义一个listState,用来保存当前窗口所有的count结果
  lazy val itemCountListState:ListState[ItemViewCount]=getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list",classOf[ItemViewCount]))

  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, out: Collector[String]): Unit = {

    //每来一条数据,就把它保存到状态中
    itemCountListState.add(value)
    //注册定时器,在windowEnd+100触发
    ctx.timerService().registerEventTimeTimer(value.windowEnd+100)

  }

  //定时器触发时,从状态中取数据,然后排序输出
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

    val allItemCountList:ListBuffer[ItemViewCount]=ListBuffer()
    //先把状态中的数据提取到一个ListBuffer中
    import scala.collection.JavaConversions._
    for(itemCount<-itemCountListState.get()){
      allItemCountList+=itemCount
    }

    //按照count值大小排序,取TopN
    val sortedItemCountList=allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)

    //清除状态
    itemCountListState.clear()

    //将排名信息格式化成String,方便键控显示
    val result:StringBuilder=new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp-100)).append("\n")
    //遍历sorted列表,输出TopN信息
    for(i<- sortedItemCountList.indices){
      //获取当前商品的count信息
      val currentItemCount = sortedItemCountList(i)
      result.append("Top").append(i+1).append(":")
        .append(" 商品ID=").append(currentItemCount.itemId)
        .append(" 访问量=").append(currentItemCount.count)
        .append("\n")
    }
    result.append("======================================================================================\n\n")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

从kafka发送数据

package hosItems_analysis

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducerUtil {
  def main(args: Array[String]): Unit = {
    writeToKafkaWithTopic("hotItems")
  }
  def writeToKafkaWithTopic(topic: String):Unit={
    val properties=new Properties()
    properties.setProperty("bootstrap.servers", "192.168.1.103:9092")
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //创建一个kafkaProducer,用它来发送数据
    val producer=new KafkaProducer[String,String](properties)

    //从文件按中读取数据,逐条发送
    val bufferedSource=io.Source.fromFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    for(line<-bufferedSource.getLines()){
      val record=new ProducerRecord[String,String](topic,line)
      producer.send(record)
    }
    producer.close()


  }
}

网站总浏览量pv

package networkflow_analysis

import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

case class PvCount(windowEnd: Long, count: Long)

object PageView {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据
    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //分配key,包装成二元组开窗聚合
    val pvStream: DataStream[PvCount] = dataStream
      .filter(_.behavior == "pv")
      //.map(_ => ("pv", 1L)) //map成二元组(“pv”,count)
      .map(new MyMapper()) //自定义Mapper,将key均匀分配
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //开一小时的滚动窗口进行统计
      .aggregate(new PvCountAgg(), new PvCountResult())

    //把各分区的结果汇总起来
    val pvTotalStream: DataStream[PvCount] = pvStream
      .keyBy(_.windowEnd)
      //.process(new TotalPvCountResult)
      .sum("count")


    //    pvStream.print()
    pvTotalStream.print()
    env.execute("pvJob")

  }

}

class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long] {
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {

    out.collect(PvCount(window.getEnd, input.head))
  }
}

//自定义MapFunction,随机生成key
class MyMapper() extends MapFunction[UserBehavior, (String, Long)] {
  override def map(value: UserBehavior): (String, Long) = (Random.nextInt(8).toString, 1L)

}

//自定义ProcessFunction,将结果按窗口聚合
class TotalPvCountResult() extends KeyedProcessFunction[Long, PvCount, PvCount] {
  //定义一个状态,用来保存当前所有结果之和
  lazy val totalCountState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-count", classOf[Long]))

  override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {

    val currentTotalCount: Long = totalCountState.value()
    //加上新的count值,更新状态
    totalCountState.update(currentTotalCount + value.count)
    //注册定时器,windowEnd+1之后触发
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)


  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    //定时器触发时,所有分区count值都已达到,输出总和
    out.collect(PvCount(ctx.getCurrentKey, totalCountState.value))
    totalCountState.clear()
  }
}

网站独立访客数uv

package networkflow_analysis

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class UvCount(windowEnd:Long,count:Long)
object UniqueVisitor {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据
    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //分配key,包装成二元组开窗聚合
    val uvStream: DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .timeWindowAll(Time.hours(1))//基于DataStream开一个小时的滚动窗口进行统计
      //.apply(new UvCountResult())
        .aggregate(new UvCountAgg(),new UvCountResultWithIncreaseAgg())

    uvStream.print()
    env.execute("uvJob")
  }
}
//自定义全窗口函数
class UvCountResult extends AllWindowFunction[UserBehavior,UvCount,TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
    //定义一个Set类型来保存所有的userID,自动去重
    var idSet: Set[Long] =Set[Long]()
    //将当前窗口的所有数据,添加到set里
    for(userBehavior<-input){
      idSet+=userBehavior.userId
    }
    //输出set的大小,就是去重之后的uv值
    out.collect(UvCount(window.getEnd,idSet.size))
  }
}
//自定义增量聚合函数,需要自定义一个Set作为累加状态
class UvCountAgg()extends AggregateFunction[UserBehavior,Set[Long],Long]{
  override def createAccumulator(): Set[Long] =  Set[Long]()

  override def add(value: UserBehavior, accumulator: Set[Long]): Set[Long] = accumulator+value.userId

  override def getResult(accumulator: Set[Long]): Long = accumulator.size

  override def merge(a: Set[Long], b: Set[Long]): Set[Long] = a++b
}

//自定义窗口函数,添加window信息包装成样例类
class UvCountResultWithIncreaseAgg()extends AllWindowFunction[Long,UvCount,TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {
    out.collect(UvCount(window.getEnd,input.head))
  }
}


使用布隆过滤

package networkflow_analysis

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloomFilter {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据
    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //分配key,包装成二元组开窗聚合
    val uvStream: DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .map(data => ("uv",data.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //基于DataStream开一个小时的滚动窗口进行统计
      .trigger(new MyTrigger)
      .process(new UvCountResultWithBloomFilter())

    uvStream.print()
    env.execute("uv job")

  }
}

//自定义一个触发器,每来一次数据就触发一次窗口计算
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
  //数据来了之后,触发计算并清空状态,保存数据
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}

//自定义ProcessWindowFunction,把当前数据进行处理,位图保存处理
class UvCountResultWithBloomFilter() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
  var jedis: Jedis = _
  var bloom: Bloom = _

  override def open(parameters: Configuration): Unit = {
    jedis = new Jedis("localhost", 6379)
    //位图大小2^30,占用128兆
    bloom=new Bloom(1<<30)
  }
  //每来一个数据,主要是用布隆过滤器判断redis位图中对应位置是否为1
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {

    //bitmap用当前窗口的end作为key,保存到redis里,(windowEnd.bitmap)
    val storedKey=context.window.getEnd.toString

    //我们把每个窗口的uv count值,作为状态也存入redis中,存成一张叫做countMap的表
    val countMap="countMap"
    //先获取当前的count值
    var count=0L
    if(jedis.hget(countMap,storedKey)!=null){
      count=jedis.hget(countMap,storedKey).toLong
    }
    //取userId,计算hash值,判断是否在位图中
    val userId=elements.last._2.toString
    val offset=bloom.hash(userId,61)
    val isExit=jedis.getbit(storedKey,offset)
    //如果不存,那么就将对应位置置1,count加1;如果存在,不做操作
    if(!isExit){
      jedis.setbit(storedKey,offset,true)
      jedis.hset(countMap,storedKey,(count+1).toString)
    }
  }
}

//自定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
  //定义位图的大小,应该是2的整次幂
  private val cap = size

  //实现一个hash函数
  def hash(str: String, seed: Int): Long = {
    var result = 0
    for (i <- 0 until str.length) {
      result = result * seed + str.charAt(i)
    }

    //返回一个在cap范围内的一个值
    (cap - 1) & result

  }
}

页面浏览量TopN

package networkflow_analysis

import java.sql.Timestamp
import java.{lang, util}
import java.text.SimpleDateFormat
import java.util.Map

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//定义输入数据样例类
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

//定义聚合结果样例类
case class PageViewCount(url: String, widowEnd: Long, count: Long)

object NetworkFlowTopNPage {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")

    //val inputStream:DataStream[String]=env.socketTextStream("localhost",7777)
    val dataStream: DataStream[ApacheLogEvent] = inputStream
      .map(data => {
        val dataArray: Array[String] = data.split(" ")
        //将时间字段转换成时间戳
        val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val timestamp = simpleDateFormat.parse(dataArray(3)).getTime
        ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
        override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
      })

    //开窗聚合
    val lateOutputTag = new OutputTag[ApacheLogEvent]("late data")
    val aggStream = dataStream
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(lateOutputTag)
      .aggregate(new PageCountAgg(), new PageCountWindowResult)

    val lateDataStream = aggStream.getSideOutput(lateOutputTag)


    //每个窗口的统计值排序输出
    val resultStream = aggStream
      .keyBy(_.widowEnd)
      .process(new TopNHotPage(3))

    //dataStream.print("data")
    //lateDataStream.print("late")
    //aggStream.print("agg")
    resultStream.print("result")
    env.execute("top n page job")
  }
}

//自定义预聚合函数
class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
  override def createAccumulator(): Long = 0L

  override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

//自定义windowFunction,包装成样例类
class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {

    out.collect(PageViewCount(key, window.getEnd, input.head))

  }
}

//自定义Process Function
class TopNHotPage(n: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {
  //定义ListState保存所有的聚合
  lazy val pageCountMapState: MapState[String, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pageCount_list", classOf[String], classOf[Long]))

  override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
    pageCountMapState.put(value.url, value.count)
    ctx.timerService().registerEventTimeTimer(value.widowEnd + 1000L)
    ctx.timerService().registerEventTimeTimer(value.widowEnd + 60 * 1000L)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

    if (timestamp == ctx.getCurrentKey + 60 * 1000L) {
      pageCountMapState.clear()
    }
    //等到数据都到齐,从状态中取出,排序输出
    val allPageCountList: ListBuffer[(String, Long)] = ListBuffer()
    val iter: util.Iterator[Map.Entry[String, Long]] = pageCountMapState.entries().iterator()
    while (iter.hasNext) {
      val entry: Map.Entry[String, Long] = iter.next()
      allPageCountList += ((entry.getKey, entry.getValue))
    }
    val sortedPageCountList: ListBuffer[(String, Long)] = allPageCountList.sortWith(_._2 > _._2).take(n)

    val result: StringBuilder = new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp)).append("\n")
    //遍历sorted列表,输出TopN信息
    for (i <- sortedPageCountList.indices) {
      //获取当前商品的count信息
      val currentItemCount: (String,Long) = sortedPageCountList(i)
      result.append("Top").append(i + 1).append(":")
        .append(" 页面url=").append(currentItemCount._1)
        .append(" 访问量=").append(currentItemCount._2)
        .append("\n")
    }
    result.append("======================================================================================\n\n")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

app不分渠道统计

package market_analysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AppMarketingTotal {
  def main(args: Array[String]): Unit = {
    val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream:DataStream[MarketUserBehavior]=env.addSource(new SimulateMarketEventSource)
      .assignAscendingTimestamps(_.timestamp)

    val resultStream:DataStream[MarketCount]=dataStream
      .filter(_.behavior!="UNINSTall")
      .map(_=>("total",1L))
      .keyBy(_._1)
      .timeWindow(Time.hours(1),Time.seconds(5))
      .aggregate(new MarketCountAgg(),new MarketCountResult())

    resultStream.print()
    env.execute("market total count job")
  }
}
//自定义预聚合函数
class MarketCountAgg()extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator+1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b
}
class MarketCountResult() extends WindowFunction[Long,MarketCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketCount]): Unit = {
    val windowStart:String=new Timestamp(window.getStart).toString
    val windowEnd:String=new Timestamp(window.getEnd).toString
    val count:Long=input.head
    out.collect(MarketCount(windowStart,windowEnd,"total","total",count))
  }
}


app分渠道统计

package market_analysis

import java.sql.Timestamp
import java.util.UUID

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

//定义输入数据样例类
case class MarketUserBehavior(userId:String,behavior:String,channel:String,timestamp:Long)
//定义输出统计的样例类
case class MarketCount(windowStart:String,windowEnd:String,channel:String,behavior: String,count:Long)

//自定义输入数据源
class SimulateMarketEventSource extends RichParallelSourceFunction[MarketUserBehavior]{
  //定义是否在运行的标识位
  var running:Boolean=true
  //定义用户行为和推广渠道的集合
  val behaviorSet:Seq[String]=Seq("CLICK","DOWNLOAD","INSTALL","UNINSTALL")
  val channelSet:Seq[String]=Seq("appStore","huaweiStore","weibo","wechat")
  //定义随机数生成器
  val rand:Random=Random
  override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
    //定义一个发出数据的最大量,用于控制测试数据量
    val maxCounts: Long =Long.MaxValue
    var count: Long =0L
    //while循环,不停地随机生成数据
    while(running&&count<maxCounts){
      val id: String =UUID.randomUUID().toString
      val behavior: String =behaviorSet(rand.nextInt(behaviorSet.size))
      val channel: String =channelSet(rand.nextInt(channelSet.size))
      val ts: Long =System.currentTimeMillis()
      ctx.collect(MarketUserBehavior(id,behavior,channel,ts))
      count+=1
      Thread.sleep(50L)
    }
  }
  override def cancel(): Unit = running = false
}
object AppMarketingByChannel {

  def main(args: Array[String]): Unit = {

    val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream:DataStream[MarketUserBehavior]=env.addSource(new SimulateMarketEventSource)
      .assignAscendingTimestamps(_.timestamp)
    val resultStream:DataStream[MarketCount]=dataStream
      .filter(_.behavior!="UNINSTALL")//过滤掉卸载行为
      .keyBy(data=>(data.channel,data.behavior))
      .timeWindow(Time.hours(1),Time.seconds(5))
      .process(new MarketCountByChannel())//自定义全窗口函数

    //dataStream.print()
    resultStream.print()
    env.execute("market count bu channel job")

  }

}
//自定义全窗口函数ProcessWindowFunction
class MarketCountByChannel()extends ProcessWindowFunction[MarketUserBehavior,MarketCount,(String,String),TimeWindow]{

  override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketCount]): Unit = {
    val windowStart:String=new Timestamp(context.window.getStart).toString
    val windowEnd:String=new Timestamp(context.window.getEnd).toString
    val channel:String=key._1
    val behavior:String=key._2
    val count:Long=elements.size
    out.collect(MarketCount(windowStart,windowEnd,channel,behavior,count))
  }
}

广告点击及过滤

package market_analysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.transformations.SideOutputTransformation
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class AdClickEvent(userId:Long,adId:Long,province: String,city:String,timestamp: Long)
case class AdCountByProvince(province:String,windowEnd:String,count:Long)
//定义侧输出流报警信息样例类
case class BlackListWarning(userId:Long,adId:Long,msg:String)
object AdAnalysisByProvince {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //从文件读取数据,转换成样例类,并提取时间戳生成watermark
    val resource=getClass.getResource("/AdClickLog.csv")
    val adLogStream:DataStream[AdClickEvent]=env.readTextFile(resource.getPath)
      .map(data=>{
        val dataArray=data.split(",")
        AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
      }).assignAscendingTimestamps(_.timestamp*1000L)

    //定义刷单行为过滤操作
    val filterBlackListStream:DataStream[AdClickEvent]=adLogStream
      .keyBy(data=>(data.userId,data.adId))
      .process(new FilterBlackList(100L))

    //按照province开窗聚合统计
    val adCountStream:DataStream[AdCountByProvince]=adLogStream
      .keyBy(_.province)
      .timeWindow(Time.hours(1),Time.seconds(5))
      .aggregate(new AdCountAgg(),new AdCountResult())



    adCountStream.print()
    filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist")
    env.execute("ad analysis by province")
    //val addCountResult

  }
}
class AdCountAgg()extends AggregateFunction[AdClickEvent,Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: AdClickEvent, accumulator: Long): Long = accumulator+1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b
}
class AdCountResult()extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = {
    val province: String =key
    val windowEnd: Long =window.getEnd
    val count: Long =input.head
    out.collect(AdCountByProvince(province,new Timestamp(windowEnd).toString,count))
  }
}

//实现自定义的额processFunction,判断用户对广告的点击次数是否达到上限
class FilterBlackList(maxClickCount: Long)extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{

  //定义状态,需要保存当前用户对当前广告的点击量count
  lazy val countState:ValueState[Long]=getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
  //标识位,用来表示用户对当前用户是否已经在黑名单当中
  lazy val isSentState:ValueState[Boolean]=getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",classOf[Boolean]))

  override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {

    //取出状态数据
    val curCount: Long =countState.value()
    //如果是第一个数据,那么注册第二天0点的定时器,用于清空状态
    if(curCount==0){
      val ts: Long =(ctx.timerService().currentProcessingTime()/(1000*60*60*24)+1)*(1000*60*60*24)
      ctx.timerService().registerProcessingTimeTimer(ts)
    }

    //判断count值是否达到上限,如果达到,并且之前没有输出过报警信息,那么报警
    if(curCount>=maxClickCount){
      if(!isSentState.value()){
        ctx.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId,value.adId,"click over"+maxClickCount+" times today"))
        isSentState.update(true)
      }
      return
    }

    //count值加1
    countState.update(curCount+1)
    out.collect(value)

  }

  //0点触发定时器,直接清空状态
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
    countState.clear()
    isSentState.clear()
  }
}

恶意登录监控

package loginFail_detect

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

case class LoginEvent(userId:Long,ip:String,eventType:String,eventTime:Long)
case class Warning(userID:Long,firstFailTime:Long,lastFailTime:Long, warningMsg:String)
object LoginFail {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据,map成样例类,并分配时间戳和watermark
    val resource=getClass.getResource("/LoginLog.csv")
    val loginEventStream:DataStream[LoginEvent]=env.readTextFile(resource.getPath)
      .map(data=>{
        val dataArray=data.split(",")
        LoginEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
     }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
      override def extractTimestamp(element: LoginEvent): Long = element.eventTime*1000L
    })

    //用ProcessFunction进行转换,如果遇到2秒内连续2次登入失败,就输出报警
    val loginWarningStream:DataStream[Warning]=loginEventStream
      .keyBy(_.userId)
      .process(new LoginFailWarning(2))

    loginWarningStream.print()
    env.execute("login fail job")

  }
}
//实现自定义的ProcessFunction
class LoginFailWarning(maxFailTimes: Int)extends KeyedProcessFunction[Long,LoginEvent,Warning]{
  //定义list状态,用来保存
  lazy val loginFailListState:ListState[LoginEvent]=getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("saved-loginEvent",classOf[LoginEvent]))

  //定义value状态,用来保存定时器的时间戳
  lazy  val timerTsState:ValueState[Long]=getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts",classOf[Long]))

  override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, out: Collector[Warning]): Unit = {
    //判断当前数据是否是登入失败
    if(value.eventType=="fail"){
      //如果是失败,那么添加到listState里,如果没有注册过定时器,就注册
      loginFailListState.add(value)
      if(timerTsState.value()==0){
        val ts=value.eventTime*1000L+2000L
        ctx.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
      }
    }else{
      //如果是登入成功,删除定时器,重新开始
      ctx.timerService().deleteEventTimeTimer(timerTsState.value())
      loginFailListState.clear()
      timerTsState.clear()
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#OnTimerContext, out: Collector[Warning]): Unit = {
    //如果2秒后的定时器触发了,那么判断listState中失败的个数
    val allLoginFailList:ListBuffer[LoginEvent]=new ListBuffer[LoginEvent]
    val iter=loginFailListState.get().iterator()
    while (iter.hasNext){
      allLoginFailList+=iter.next()
    }
    if(allLoginFailList.length>=maxFailTimes){
      out.collect(Warning(ctx.getCurrentKey,allLoginFailList.head.eventTime,allLoginFailList.last.eventTime,"login fail in 2s for "+allLoginFailList.length+" times."))
    }
    //清空状态
    loginFailListState.clear()
    timerTsState.clear()
  }
}

使用Cep监控恶意登入

package loginFail_detect

import java.util

import javax.tools.ForwardingFileObject
import loginFail_detect.LoginFail.getClass
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object LoginFailWithCep {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件读取数据,map成样例类,并分配时间戳和watermark
    val resource = getClass.getResource("/LoginLog.csv")
    val loginEventStream: DataStream[LoginEvent] = env.readTextFile(resource.getPath)
      .map(data => {
        val dataArray = data.split(",")
        LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
      override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L
    })


    //1、定义匹配的模式
    val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
      .begin[LoginEvent]("firstFail").where(_.eventType == "fail") //第一次登录失败
      .next("secondFail").where(_.eventType == "fail")
      .within(Time.seconds(2)) //在2秒之内检测匹配

    //2、在分组之后的数据流上应用模式,得到一个PatternStream
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)

    //3、将检测到的事件序列,转换输出报警信息
    val loginFailStream:DataStream[Warning]=patternStream.select(new LoginFailDetect())

    //4、打印输出
    loginFailStream.print()
    env.execute("login fail job")
  }
}
//自定义PatternSelectFunction,用来将检测到的连续登入失败事件,包装成报警信息输出
class LoginFailDetect()extends PatternSelectFunction[LoginEvent,Warning]{
  override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
    //map里存放的就是匹配到的一组事件,key是定义好的事件模式名称
    val firstLoginFail=map.get("firstFail").get(0)
    val secondLoginFail=map.get("secondFail").get(0)
    Warning(firstLoginFail.userId,firstLoginFail.eventTime,secondLoginFail.eventTime,"login fail")
  }
}

订单支付

package order_detect

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class OrderEvent(orderId:Long,eventType:String,txId:String,eventTime:Long)
case class OrderResult(orderId:Long,resultMsg:String)
object OrderTimeout {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //从文件中读取数据
    val resource=getClass.getResource("/OrderLog.csv")
    val orderEventStream:DataStream[OrderEvent]=env.readTextFile(resource.getPath)
      .map(data=>{
        val array=data.split(",")
        OrderEvent(array(0).toLong,array(1),array(2),array(3).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
      override def extractTimestamp(element: OrderEvent): Long = element.eventTime*1000L
    })

    //1、定义一个要匹配事件序列的模式
    val orderPayPattern=Pattern
      .begin[OrderEvent]("create").where(_.eventType=="create")//首先是订单的create事件
      .followedBy("pay").where(_.eventType=="pay")//后面来的是订单的pay事件
      .within(Time.minutes(15))

    //2、将patten应用在按照orderId分组的数据流上
    val patternStream: PatternStream[OrderEvent] =CEP.pattern(orderEventStream.keyBy(_.orderId),orderPayPattern)

    //3、定义一个侧输出流标签,用来标明超时事件的侧输出流
    val orderTimeoutOutputTag=new OutputTag[OrderResult]("order timeout")

    //4、调用select方法,提取匹配事件和超时事件,分别进行处理转换输出
    val resultStream:DataStream[OrderResult]=patternStream
      .select(orderTimeoutOutputTag,new OrderTimeoutSelect(),new OrderPaySelect())

    //5、打印输出
    resultStream.print("payed")
    resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")

    env.execute("order timeout detect job")
  }
}
//自定义超时处理函数
class OrderTimeoutSelect()extends PatternTimeoutFunction[OrderEvent,OrderResult]{
  override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
    val timeoutOrderId=map.get("create").iterator().next().orderId
    OrderResult(timeoutOrderId,"timeout at " + l)
  }
}
//自定义匹配处理函数
class OrderPaySelect()extends PatternSelectFunction[OrderEvent,OrderResult]{
  override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
    val payOrderId=map.get("pay").get(0).orderId
    OrderResult(payOrderId,"payed successfully")
  }
}

使用Cep监控订单支付

package order_detect

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object OrderTimeoutWithoutCep {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
      .map(data => {
        val array: Array[String] = data.split(",")
        OrderEvent(array(0).toLong, array(1), array(2), array(3).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(4)) {
      override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
    })

    //自定义process Function,做精细化的流程控制
    val orderResultStream: DataStream[OrderResult] = orderEventStream
      .keyBy(_.orderId)
      .process(new OrderPayMatchDetect())

    orderResultStream.print("payed")
    orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print()
    env.execute("order timeout")
  }
}

//实现自定义keyedProcessFunction,主流输出正常支付的订单,侧输出流输出超时报警订单
class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
  //定义状态,用来保存是否来过create和pay事件的标识位,以及定时器时间戳
  lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))
  lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))
  val orderTimeoutOutputTag: OutputTag[OrderResult] = new OutputTag[OrderResult]("timeout")

  override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {

    //先取出当前状态
    val isPayed = isPayedState.value()
    val isCreated = isCreatedState.value()
    val timerTs = timerTsState.value()
    //判断当前事件的类型,分成不同的情况讨论
    //情况1:来的是create,要继续判断之前是否有pay来过
    if (value.eventType == "create") {
      //情况1.1:如果已经pay过的话,匹配成功
      if (isPayed) {
        out.collect(OrderResult(value.orderId, "payed successfully"))
        isPayedState.clear()
        timerTsState.clear()
        ctx.timerService().deleteEventTimeTimer(timerTs)
      }
      //情况1.2:如果没pay过,那么就注册一个15分钟后的定时器,开始等待
      else {
        val ts: Long = value.eventTime * 1000L + 15 * 60 * 1000L
        ctx.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isCreatedState.update(true)
      }
      //情况2.1.2:如果已经超时,输出timeout报警到侧输出流

    }
    //情况2:来的是pay,要继续判断是否来过create
    else if (value.eventType == "pay") {
      //情况2.1:如果create已经来过,匹配成功,要继续判断时间是否超过了15分钟
      if (isCreated) {
        //情况2.1.1:如果没有超时,正常输出结果到主流
        if (value.eventTime * 1000 < timerTs) {
          out.collect(OrderResult(value.orderId, "payed successfully"))
        }
        //情况2.1.2:如果已经超时,输出timeout报警到侧输出流
        else {
          ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
        }
        isCreatedState.clear()
        timerTsState.clear()
        ctx.timerService().deleteEventTimeTimer(timerTs)
        //不论哪种情况都有了输出,清空状态
      }
      //情况2.2:如果create没来,需要等待乱序create,注册一个当前pay时间戳的定时器
      else {
        val ts = value.eventTime * 1000L
        ctx.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isPayedState.update(true)
      }
    }
  }
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
    //定时器触发,要判断是哪种情况
    if (isPayedState.value()) {
      //如果pay过,那么说明create没来,可能出现数据丢失异常情况
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found create log"))
    } else {
      //如果没有pay过,说明真正15分钟超时
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
    }
    isPayedState.clear()
    isCreatedState.clear()
    timerTsState.clear()
  }
}

实时对账

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

//定义到账数据的样例类
case class ReceiptEvent(txId: String, payChannel: String, timeStamp: Long)

case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)

case class OrderResult(orderId: Long, resultMsg: String)

object OrderPayTxMatch {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
      .map(data => {
        val array: Array[String] = data.split(",")
        OrderEvent(array(0).toLong, array(1), array(2), array(3).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(4)) {
      override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
    }).filter(_.txId != "").keyBy(_.txId)

    val resource2 = getClass.getResource("/ReceiptLog.csv")
    val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile(resource2.getPath)
      .map(data => {
        val array: Array[String] = data.split(",")
        ReceiptEvent(array(0), array(1), array(2).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(4)) {
        override def extractTimestamp(element: ReceiptEvent): Long = element.timeStamp * 1000L
      })
      .keyBy(_.txId)

    //用connect连接两条流,匹配事件进行处理
    val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
      .connect(receiptEventStream)
      .process(new OrderPayTxDetect())

    val unmatchedPays: OutputTag[OrderEvent] = new OutputTag[OrderEvent]("unmatched-pays")
    val unmatchedReceipts: OutputTag[ReceiptEvent] = new OutputTag[ReceiptEvent]("unmatched-receipts")

    resultStream.print("matched")
    resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
    resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")

    env.execute("order pay tx match job")
  }
}

//自定义CoProcessFunction,实现两条流数据的匹配检验
class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {
  //用两个ValueState,保存当前交易对应的支付事件和到账事件
  lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
  lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))
  val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
  val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")

  override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    //pay来了,考察有没有对应的receipt来过
    val receipt: ReceiptEvent = receiptState.value()
    if (receipt != null) {
      //如果已经有receipt,那么正常匹配
      out.collect((pay, receipt))
      receiptState.clear()
    } else {
      //如果receipt还没来,那么把pay存入状态,注册一个定时器等待5秒
      payState.update(pay)
      ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 3000L)
    }
  }

  override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    val pay: OrderEvent = payState.value()
    if (pay != null) {
      //如果已经有receipt,那么正常匹配
      out.collect((pay, receipt))
      payState.clear()
    } else {
      //如果pay还没来,那么把pay存入状态,注册一个定时器等待3秒
      receiptState.update(receipt)
      ctx.timerService().registerEventTimeTimer(receipt.timeStamp * 1000L + 5000L)
    }
  }

  //定时器触发,有两种情况,所以要判断当前没有pay和receipt
  override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

    if (payState.value() != null) {
      ctx.output(unmatchedPays, payState.value())
    }
    if (receiptState.value() != null) {
      ctx.output(unmatchedReceipts, receiptState.value())
    }
    //清空状态
    payState.clear()
    receiptState.clear()

  }
}

标签:String,val,flink,Long,org,apache,import
来源: https://blog.csdn.net/m0_56923407/article/details/119298958

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有