ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink ProcessWindowFunction使用举例

2021-01-18 17:02:11  阅读:707  来源: 互联网

标签:String val flink 举例 ProcessWindowFunction behavior apache import




使用范围


ProcessWindowFunction是作用在keyed (grouped) and windows的数据流上

 

代码

package com.yy.Channel

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

// 定义输入数据样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

// 定义输出数据样例类
case class MarketViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)


// 自定义 测试 数据源
class SimulatedSource() extends RichSourceFunction[MarketUserBehavior] {
    var running = true
    val behaviorSet = Seq("view", "download", "install", "uninstall")
    val channelSet = Seq("appstore", "weibo", "wechat", "tieba")
    val rand = Random

    override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
        val maxCount = Long.MaxValue
        var count = 0L

        while (running && count < maxCount) {
            val id = java.util.UUID.randomUUID().toString
            val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
            val channel = channelSet(rand.nextInt(channelSet.size))
            val ts = System.currentTimeMillis()

            ctx.collect(MarketUserBehavior(id, behavior, channel, ts))
            count += 1
            Thread.sleep(50L)
        }


    }

    override def cancel(): Unit = running = false
}

/**
 * 统计 同(chanel,behavior) 分组后 窗口内每个分组的个数
 */
object AppMarketByChannel {
    def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val dataStream = env.addSource(new SimulatedSource)
            .assignAscendingTimestamps(_.timestamp)
        val resultStream = dataStream
            .filter(_.behavior != "uninstall")
            .keyBy(x => (x.channel, x.behavior))
            .timeWindow(Time.days(1), Time.seconds(5))
            .process(new MarketCountByChannel())

        resultStream.print()


        env.execute("任务名称")
    }

}


class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior, MarketViewCount, (String, String), TimeWindow]() {
    override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
        val start = context.window.getStart.toString
        val end = context.window.getEnd.toString
        val count = elements.size
        out.collect(MarketViewCount(start, end, key._1, key._2, count))
    }
}

 

标签:String,val,flink,举例,ProcessWindowFunction,behavior,apache,import
来源: https://blog.csdn.net/qq_35515661/article/details/112788506

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有