ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark_subject合集

2022-07-17 23:09:46  阅读:191  来源: 互联网

标签:00 01 威视 海康 2020 spark 合集 subject


spark subject

subject_1:lag函数的使用场景(灵活)

公司代码,年度,1月-------------------------12月的收入金额
burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12
853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990
853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991
853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992
853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993
853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994
853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995
853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996
853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997
853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998
853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999
846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000
846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001
846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573
846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671
846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769
846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866
846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964
846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062
846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159
846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257


1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数

输出结果
公司代码,年度,月份,当月收入,累计收入


2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)

answer

package com.sql

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Demo8Burks {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("burk")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    //读取数据
    val burkDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("burk STRING,year STRING,tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE,tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE,tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")
      .load("data/burks.txt")

    burkDF.show()

    /**
     * 1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数
     * 输出结果
     * 公司代码,年度,月份,当月收入,累计收入
     */
    /**
     * 我们先用sql写
     *
     */

    //先创建视图,才能写sql
    burkDF.createOrReplaceTempView("burks")

    spark.sql(
      """
        |
        |select
        |burk,year,month,plc,sum(plc) over(partition by burk,year order by month) as leiji
        |from
        |(
        |select burk,year,month,plc
        |from burks
        |lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,plc
        |) t1
        |
        |
        |
        |""".stripMargin).show()



    val m: Column = map(
      expr("1"), $"tsl01",
      expr("2"), $"tsl02",
      expr("3"), $"tsl03",
      expr("4"), $"tsl04",
      expr("5"), $"tsl05",
      expr("6"), $"tsl06",
      expr("7"), $"tsl07",
      expr("8"), $"tsl08",
      expr("9"), $"tsl09",
      expr("10"), $"tsl10",
      expr("11"), $"tsl11",
      expr("12"), $"tsl12"
    )

    /**
     * 采用burkDF的写法
     */
    burkDF
      //先行专列
      .select($"burk",$"year",explode(m) as Array("month","plc"))
      //加上orderBy可以依此累加
      .withColumn("leiJi",sum($"plc") over Window.partitionBy($"burk",$"year").orderBy($"month"))
      .show()

    /**
     * 2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
     * 公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
     *
     * coalesce: 返回第一个部位null的列
     *
     */
    burkDF
      //先行行转列
      .select($"burk",$"year",explode(m) as Array("month","plc"))
      //取上年同期的收入
      .withColumn("shangTong",lag($"plc",1) over Window.partitionBy($"burk",$"month").orderBy($"year"))
      //计算增长率
      .withColumn("p",round($"shangTong" / $"plc" -1,5))
      .withColumn("p",when($"shangTong".isNotNull,$"p").otherwise(1.0))
      .select($"burk",$"year",$"month",$"p")
      .show()
  }
}

subject_2:行列转换

1、行列转换

表1
姓名,科目,分数
name,item,score
张三,数学,33
张三,英语,77
李四,数学,66
李四,英语,78


表2
姓名,数学,英语
name,math,english
张三,33,77
李四,66,78

    1、将表1转化成表2
    2、将表2转化成表1

answer

package com.sql
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Demo11Student {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("stu_sco")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    //读取数据
    val stuDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("name STRING,subject STRING, score DOUBLE")
      .load("data/stu_sco.txt")

    /**
     * 将列转换成行
     */

    val hang_lieDF: DataFrame = stuDF
      .groupBy($"name")
      .agg(
        sum(when($"subject" === "数学", $"score").otherwise(0)) as "math",
        sum(when($"subject" === "英语", $"score").otherwise(0)) as "english"
      )
    hang_lieDF.show()

    /**
     * 将行转换成列
     *
     */
    val m: Column = map(
      expr("'数学'"), $"math",
      expr("'英语'"), $"english"
    )

    hang_lieDF
      .select($"name",explode(m) as Array("subject","score"))
      .show()
  }
}

subject_3:时间上的聚合

91330000733796106P,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-06-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-07-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-08-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-09-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-06-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-07-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-08-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-09-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00

answer

package com.sql

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.language.postfixOps

object Demo12SheBao {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("sheBao")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

      //读取数据
      val sheBaoDF: DataFrame = spark
        .read
        .format("csv")
        .option("sep", ",")
        .schema("id STRING, burk STRING, sdate STRING")
        .load("data/sheBao.txt")

    sheBaoDF.show()

    /**
     * 在时间线上聚类
     *
     */
    sheBaoDF
      //按照id进行分组按照时间进行排序,取出上一条数据
      .withColumn("last_name",lag($"burk",1) over Window.partitionBy($"id").orderBy($"sdate"))
      //在取出一列,当前一家公司和本家公司名称一样就为0否则就为1,当为1时就表示新入职
      .withColumn("flag",when($"burk" === $"last_name",0).otherwise(1))
      //将最后一条数据按照时间线上聚合(排序),则会以此累计相加,可以得到分组
      .withColumn("fenZu",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))
      //按照 fenzu 字段取出入职时间和离职时间
      .groupBy($"id", $"burk", $"fenZu")
      .agg(min($"sdate") as "start_date", max($"sdate") as "end_date")
      .show(1000)
  }
}

标签:00,01,威视,海康,2020,spark,合集,subject
来源: https://www.cnblogs.com/atao-BigData/p/16488811.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有