ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

sparkstreaming转换算子--窗口函数

2022-09-02 13:01:06  阅读:188  来源: 互联网

标签:10 窗口 val -- sparkstreaming Seconds 算子 DStream ssc


window

  • 画图理解
  • 说明
    countByWindow 对每个滑动窗口的数据执行count操作
    reduceByWindow 对每个滑动窗口的数据执行reduce操作
    reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
    countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
    都需要传入两个核心参数
    windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍
    slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍
  • 案例
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 滑动窗口算子
 *    每隔一段时间,对原始DStream中多个批次数据整合  成为新的DStream中一个批次数据
 *
 * Spark Streaming中,一个批次执行一次,不会积攒当前批次的数据。滑动窗口算子可以实现将多个批次数据积攒下来,然后再去做统一的运算
 *   窗口算子最为基础核心的算子 window 会给我们返回一个新的DStream,但是这个DStream包含多个未被处理的批次数据
 *      窗口函数中需要传递核心参数
 *      windowDuration: Duration,  窗口时间长度--一般是batchSize(批次时间)的整数倍
 *      slideDuration: Duration:  滑动时间长度----一般是batchSize(批次时间)的整数倍
 */object ByWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
    val ssc = new StreamingContext(conf, Milliseconds(10000))

    val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
    // 窗口长度 30s  滑动间隔 10s  每个10s时间将DStream中前30秒的数据 整合为一个批次数据处理
    val ds1 = ds.window(Seconds(10), Seconds(10))
    ds1.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 使用window窗口函数算子 严格意义上只负责去对原始DStream进行窗口检测,形成窗口批次数据的DStream,如果我们要对窗口批次数据
 * 进行处理的话,还得需要对窗口批次数据的DStream使用转换算子和行动算子计算逻辑
 *
 * windows函数也有一些变种的窗口函数算子:既可以实现窗口批次数据的检测,也可以实现一些相关的计算功能
 *   countByWindow 对每个滑动窗口的数据执行count操作
 *   reduceByWindow 对每个滑动窗口的数据执行reduce操作
 *   reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
 *   countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
 */
object ByWindow2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state02").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(10000))
    ssc.checkpoint("hdfs://node1:9000/sparkstreaming")

    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)
    val ds1 = ds.map((_, 1))
    val ds2 = ds1.reduceByKeyAndWindow((a: Int, b: Int)=>(a+b), Seconds(10), Seconds(10))
    ds2.print()
    println(",,,,,,,,,,")
    val ds3: DStream[Long] = ds1.countByWindow(Seconds(10), Seconds(10))
    ds3.print()
    println(",,,,,,,,,,")
    val ds4 = ds1.reduceByWindow((a, b) => (a._1+b._1, 0), Seconds(10), Seconds(10))
    ds4.print()
    println(",,,,,,,,,,")
    // 需要设置检查点
    val ds5 = ds1.countByValueAndWindow(Seconds(10), Seconds(10))
    ds5.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

应用场景:黑名单

package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 黑名单统计
 *   有市场就有竞争,有竞争就少不来邪门外道
 *   A厂家投放了广告,广告每点击一次都是有记录的,但是不排初竞争对手的恶意点击
 *
 *   实时统计黑名单用户
 *   网站每隔3秒记录一批次用户的点击行为,记录的时候,认定如果在1分钟之内 用户点击次数超过10次 认定这个用户是一个黑名单用户
 *   需要把用户IP封掉
 *
 *   Spark Streaming去连接端口数据源:
 *     端口模拟用户的点击行为  发送数字 数字就代表某一个用户id
 */
object BlackUser {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state01").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(3000))
    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)

    val ds1:DStream[String] = ds.window(Minutes(1),Minutes(1))
    val ds2:DStream[(String,Int)] = ds1.map((_, 1)).reduceByKey(_ + _)
    //保留黑名单用户
    val ds3:DStream[(String,Int)] = ds2.filter(tuple=>{
      if(tuple._2>=10){
        true
      }else{
        false
      }
    })
    ds3.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

标签:10,窗口,val,--,sparkstreaming,Seconds,算子,DStream,ssc
来源: https://www.cnblogs.com/jsqup/p/16649421.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有