标签:String val flink 举例 ProcessWindowFunction behavior apache import
使用范围
ProcessWindowFunction是作用在keyed (grouped) and windows的数据流上
代码
package com.yy.Channel
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
// 定义输入数据样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)
// 定义输出数据样例类
case class MarketViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)
// 自定义 测试 数据源
class SimulatedSource() extends RichSourceFunction[MarketUserBehavior] {
var running = true
val behaviorSet = Seq("view", "download", "install", "uninstall")
val channelSet = Seq("appstore", "weibo", "wechat", "tieba")
val rand = Random
override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
val maxCount = Long.MaxValue
var count = 0L
while (running && count < maxCount) {
val id = java.util.UUID.randomUUID().toString
val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = System.currentTimeMillis()
ctx.collect(MarketUserBehavior(id, behavior, channel, ts))
count += 1
Thread.sleep(50L)
}
}
override def cancel(): Unit = running = false
}
/**
* 统计 同(chanel,behavior) 分组后 窗口内每个分组的个数
*/
object AppMarketByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new SimulatedSource)
.assignAscendingTimestamps(_.timestamp)
val resultStream = dataStream
.filter(_.behavior != "uninstall")
.keyBy(x => (x.channel, x.behavior))
.timeWindow(Time.days(1), Time.seconds(5))
.process(new MarketCountByChannel())
resultStream.print()
env.execute("任务名称")
}
}
class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior, MarketViewCount, (String, String), TimeWindow]() {
override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
val start = context.window.getStart.toString
val end = context.window.getEnd.toString
val count = elements.size
out.collect(MarketViewCount(start, end, key._1, key._2, count))
}
}
标签:String,val,flink,举例,ProcessWindowFunction,behavior,apache,import 来源: https://blog.csdn.net/qq_35515661/article/details/112788506
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。