ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

asdfghjkl

2020-05-13 12:02:13  阅读:316  来源: 互联网

标签:DataTypes String val asdfghjkl import spark strings


 1 package com.bawei.foryk
 2 
 3 import com.bawei.util.DateTools
 4 
 5 
 6 object TrafficUtil {
 7 
 8   //根据拍照的经纬度与天安门的经纬度计算距离判断位于几环
 9   def circle(x:Int,y:Int): Int ={
10     val distance: Long = Math.round( Math.sqrt( Math.pow(x - 39 ,2  ) +  Math.pow(y - 116 ,2  ) ))
11     if(distance>0&&distance<=15) 2
12     else if(distance>15&&distance<=30) 3
13     else if(distance>30&&distance<=40) 4
14     else if(distance>40&&distance<=60) 5
15     else 6
16   }
17 
18   //传递参数为机动车类型,以及提供的距离天安门距离计算公式按以下伪代码判断得到违反交规类型
19   def isRule(actiontime:String,carno:String,cartype:String,x:Int,y:Int):String={
20     if(cartype=="A" && circle(x,y)==2) "摩托车A进入2环"
21     else if(cartype=="B" && circle(x,y)==4) "摩托车B进入4环"
22     else if(cartype=="C" && circle(x,y)==6 && !carno.startsWith("京")) "外地牌照不能进入5环"
23     else if(cartype=="C" && carno.startsWith("京")){
24       val weishu: String = carno.substring(carno.length-1,carno.length)
25       val week: String = DateTools.dateToWeek(actiontime)
26       if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则"
27       else "本地牌照不符合单双号限行规则"
28     }else{
29       "不违规"
30     }
31   }
32 
33 
34 
35 
36 
37   //编写scala方法根据传递车牌号码,传递的日期 判断是否符合单双号限行规则
38   /*def isAllow(carno:String,actiontime:String): String ={
39     if(carno.startsWith("京")){
40       val weishu: String = carno.substring(carno.length-1,carno.length)
41       println(s"尾数是:${weishu}")
42       val week: String = DateTools.dateToWeek(actiontime)
43       if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则"
44       else "不符合单双号限行规则"
45     }else{
46       "非本地拍照"
47     }
48   }*/
49 
50 
51 
52   def main(args: Array[String]): Unit = {
53     //println(distance(66,142))
54     //println(isAllow("京H17453","2020-05-13 15:22:06"))
55   }
56 }
 1 package com.bawei.foryk
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.sql.types.{DataTypes, StructType}
 5 import org.apache.spark.sql.{DataFrame, SparkSession}
 6 
 7 
 8 //case class Car(actiontime:String,carno:String,cartype:String,x:Int,y:Int,)
 9 object SparkSqlTraffic02 {
10 
11   def main(args: Array[String]): Unit = {
12     val spark: SparkSession = SparkSession
13       .builder()
14       .appName("SparkSqlTraffic01")
15       .master("local")
16       .getOrCreate()
17 
18     //读取文件创建RDD
19     val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/traffic.txt")
20 
21     val tuple7RDD: RDD[(String, String, String, String, String, Long, String)] = lineRDD.map(line => {
22       val strings: Array[String] = line.split(",")
23       (strings(0), strings(1), strings(2), strings(3), strings(4),
24         TrafficUtil.circle(strings(3).toInt, strings(4).toInt),
25         TrafficUtil.isRule(strings(0), strings(1), strings(2), strings(3).toInt, strings(4).toInt)
26       )
27     })
28 
29     /*var carSchema = StructType(
30       List(DataTypes.createStructField("actiontime", DataTypes.StringType, true),
31         DataTypes.createStructField("carno", DataTypes.StringType, true),
32         DataTypes.createStructField("cartype", DataTypes.StringType, true),
33         DataTypes.createStructField("x", DataTypes.IntegerType, true),
34         DataTypes.createStructField("y", DataTypes.IntegerType, true),
35         DataTypes.createStructField("circle", DataTypes.IntegerType, true),
36         DataTypes.createStructField("info", DataTypes.StringType, true))
37     )*/
38     import spark.implicits._
39     val tupleDF: DataFrame = tuple7RDD.toDF()
40     tupleDF.createOrReplaceTempView("car")
41 
42 
43     //spark.sql("select * from car").show()
44     //使用sparksql按环线,车辆类型统计出现车辆个数如下
45     //spark.sql("select _6,_3,count(*) from car group by _6,_3 order by _6,_3").show()
46     //使用sparksql按车辆类型,违规类型统计车辆出现次数
47     //spark.sql("select _3,_7,count(*) from car where _7!='不违规' group by _3,_7 order by _3,_7 ").show()
48     //:按时间,环线各个环线内按机动车类型出现车辆个数
49     val resultRDD: DataFrame = spark.sql("select substring(_1,0,10),_6,_3,count(*) from car group by substring(_1,0,10),_6,_3 order by substring(_1,0,10),_6,_3")
50     resultRDD.coalesce(1).rdd.saveAsTextFile("./carresult")
51 
52 
53     spark.stop()
54 
55   }
56 }
 1 package com.bawei.foryk
 2 
 3 import java.util.Properties
 4 
 5 import org.apache.spark.rdd.RDD
 6 import org.apache.spark.sql.{DataFrame, SparkSession}
 7 
 8 /**
 9   * 
10   */
11 case class Student(name:String,sex:String,age:Int)
12 object SparkSqlReview01 {
13   def main(args: Array[String]): Unit = {
14     val spark: SparkSession = SparkSession
15       .builder()
16       .appName("SparkSqlTraffic01")
17       .master("local")
18       .getOrCreate()
19 
20     //读取文件创建RDD
21     val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/data.txt")
22 
23     val studentRDD: RDD[Student] = lineRDD.map(line => {
24       val strings: Array[String] = line.split(",")
25       Student(strings(0), strings(1), strings(2).toInt)
26     })
27 
28     import spark.implicits._
29     val studentDF: DataFrame = studentRDD.toDF()
30 
31     studentDF.createOrReplaceTempView("student")
32 
33     val resultDF: DataFrame = spark.sql("select * from student where age <20")
34 
35     val prop =new Properties()
36     prop.setProperty("user","root")
37     prop.setProperty("password","")
38 
39     resultDF.write.jdbc("jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&characterEncoding=UTF-8","student",prop)
40 
41 
42 
43     spark.stop()
44 
45   }
46 
47 }
 1 package com.bawei.foryk
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 7 import org.apache.spark.streaming.kafka010.KafkaUtils
 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 9 import org.apache.spark.{SparkConf, SparkContext}
10 import org.apache.spark.streaming.{Seconds, StreamingContext}
11 
12 /**
13   * 
14   */
15 object SparkStreamReview01 {
16 
17 
18 
19   def main(args: Array[String]): Unit = {
20     var checkpointdir = "./checkdir2"
21     StreamingContext.getOrCreate(checkpointdir,()=>{
22       createFunc(checkpointdir)
23     })
24   }
25 
26   def createFunc(checkpointdir:String): StreamingContext = {
27 
28     val conf: SparkConf = new SparkConf().setAppName("SparkStreamReview01").setMaster("local[2]")
29     val sc = new SparkContext(conf)
30 
31     sc.setLogLevel("WARN")
32     val ssc = new StreamingContext(sc,Seconds(5))
33     ssc.checkpoint(checkpointdir)
34 
35 
36     val kafkaParams = Map[String, Object](
37       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
38       "key.deserializer" -> classOf[StringDeserializer],
39       "value.deserializer" -> classOf[StringDeserializer],
40       "group.id" -> "group1"
41     )
42     //5、定义一个topics ,是一个集合,可以存放多个topic
43     val topics=Set("test")
44     //6、利用KafkaUtils.createDirectStream构建Dstream
45     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
46     //获取kafka中topic的数据
47     val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value())
48 
49     val mapRDD: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1))
50 
51     //mapRDD.reduceByKey(_+_).print()
52     //mapRDD.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5)).print()
53     //mapRDD.countByValueAndWindow(Seconds(10),Seconds(5)).print()
54 
55     val result: DStream[(String, Int)] = mapRDD.updateStateByKey((list: Seq[Int], option: Option[Int]) => {
56       //
57       var before = option.getOrElse(0) //获取上一次的累加结果
58       for (value <- list) {
59         before += value
60       }
61       Option(before)
62     })
63     result.print()
64 
65     ssc.start()
66     ssc.awaitTermination()
67     ssc
68   }
69 
70 }

 

标签:DataTypes,String,val,asdfghjkl,import,spark,strings
来源: https://www.cnblogs.com/xjqi/p/12881553.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有