ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark streaming 小案例

2022-07-20 10:35:08  阅读:179  来源: 互联网

标签:code val 案例 county streaming spark speed id card


spark streaming

实时计算的案例

数据

{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117985031601010,"camera_id":"01214","orientation":"西南","road_id":34052057,"time":1614711914,"speed":45.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117984031601010,"camera_id":"01024","orientation":"西北","road_id":34052058,"time":1614711924,"speed":45.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117970031606010,"camera_id":"01022","orientation":"西北","road_id":34052059,"time":1614712022,"speed":75.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117956031625010,"camera_id":"01132","orientation":"西北","road_id":34052060,"time":1614712120,"speed":46.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117925031638010,"camera_id":"00202","orientation":"西北","road_id":34052061,"time":1614712218,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117902031651010,"camera_id":"01102","orientation":"西北","road_id":34052062,"time":1614712316,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117885031666010,"camera_id":"01221","orientation":"西北","road_id":34308114,"time":1614712414,"speed":48.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117855031704010,"camera_id":"00231","orientation":"西北","road_id":34308115,"time":1614712619,"speed":59.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117817031742010,"camera_id":"01130","orientation":"西北","road_id":34308116,"time":1614712824,"speed":52.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117784031777010,"camera_id":"00123","orientation":"西北","road_id":34308117,"time":1614713030,"speed":71.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117720031793010,"camera_id":"00132","orientation":"西北","road_id":34308118,"time":1614713235,"speed":65.5}
...
...
...

注意点:

* 将数据保存到数据库存在的问题
* 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
* 2、如果将网络链接放在foreach算子的外面,会报错,  网络链接不能再网络中传输
*
* 正确写法
* 使用foreachPartition,只会为每一个分区创建一个数据库链接
*
* rdd的foreach和foreachPartition
* foreach一次处理一条数据
* foreachPartition: 一次处理一个分区的数据

处理方法:

package com.shujia.spark.streaming
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date

object Demo8Card {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("ds")
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))


    /**
     * 读取卡口过车数据
     */
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * 1、解析json格式的数据
     *
     */
    val cardAndSpeedDS: DStream[(Long, (Double, Int))] = linesDS.map(line => {
      //使用fastjson工具解析json数据
      val carJson: JSONObject = JSON.parseObject(line)
      //取出卡口编号和车速
      val card: Long = carJson.getLong("card")
      val speed: Double = carJson.getDouble("speed")
      (card, (speed, 1))
    })

    /**
     * 2、实时统计每隔卡口的平均车速,和车的数量
     * 统计最近15秒的车辆,每隔5秒统计一次
     *
     */

    val sumSpeedAndNUmDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS
      .reduceByKeyAndWindow((kv1: (Double, Int), kv2: (Double, Int)) => {
        //计算总的测试
        val sumSpeed: Double = kv1._1 + kv2._1
        //计算车的数量
        val num: Int = kv1._2 + kv2._2
        (sumSpeed, num)
      }, Durations.seconds(15), Durations.seconds(5))

    /**
     * 3、计算平均车速
     *
     */
    val avgSpeedAndNumDs: DStream[(Long, Int, Double)] = sumSpeedAndNUmDS.map {
      case (card: Long, (sumSpeed: Double, num: Int)) =>
        val avgSpeed: Double = sumSpeed / num
        (card, num, avgSpeed)
    }

    /**
     * 4、将统计的结果保存到mysql中
     *
     * 将数据保存到数据库存在的问题
     * 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
     * 2、如果将网络链接放在foreach算子的外面,会报错,  网络链接不能再网络中传输
     *
     * 正确写法
     * 使用foreachPartition,只会为每一个分区创建一个数据库链接
     *
     * rdd的foreach和foreachPartition
     * foreach一次处理一条数据
     * foreachPartition: 一次处理一个分区的数据
     *
     */

    avgSpeedAndNumDs.foreachRDD(rdd => {
      rdd.foreachPartition(iter => {
        //获取统计的时间
        val date = new Date()
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val comDate: String = format.format(date)

        //1、加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        //2、创建链接
        val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
        //3、编写插入数据的sql
        val stat: PreparedStatement = con.prepareStatement("insert into card_avg_speed_and_num(card,com_date,num,avg_speed) values(?,?,?,?)")

        //这里的foreach是迭代器的一个普通方法,不是一个算子
        iter.foreach {
          case (card: Long, num: Int, avgSpeed: Double) =>

            //设置参数
            stat.setLong(1, card)
            stat.setString(2, comDate)
            stat.setInt(3, num)
            stat.setDouble(4, avgSpeed)
            //插入数据
            stat.execute()
        }

        stat.close()
        con.close()
      })

    })

    avgSpeedAndNumDs.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

标签:code,val,案例,county,streaming,spark,speed,id,card
来源: https://www.cnblogs.com/atao-BigData/p/16496896.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有