ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark-万物之源WordCount(四)

2021-11-22 23:00:08  阅读:187  来源: 互联网

标签:SparkContext String val WordCount 之源 sc new spark data


Spark实现WordCount的N种方法


  大家好啊,这里就不自我介绍了,我们说一下WordCount,也就是词频。大家可能在各种渠道学习数据处理都会是WordCount首当其冲,为什么呢?因为WordCount简单。但是可以很好的形容数据处理和数据统计。今天我们也跟风的讲一讲WordCount,但是呢?我们不是泛泛的讲讲,我们是抱着系统学习的态度开始的。因为实现WordCount的方法有很多,每一种方法都是不同的算子,都会让你有不同的收获。那就开始了哈。

一、数据源。

[root@host juana]# touch data.txt
[root@host juana]# vim data.txt
liubei,sunshangxiang,zhaoyun
minyue,guanyu,juyoujin,nakelulu
liubei,libai
libai,guanyu,bailishouyue

二、具体实现。

1、方法1

  这是最原始的方法。

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    // 扁平化处理
      .flatMap(line=>{line.split(",")})
     // 逐个击破
      .map(x=>(x,1)).reduceByKey(_+_)
     // 逐个输出
      .foreach(println)
     // 关闭环境
    sc.stop()
  }
}

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

2、方法2

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => {line.split(" ")})
    .map(data => (data, 1))
    .groupBy(_._1)
    .map(data=>(data._1, data._2.size))
    .foreach(println)
    sc.stop()

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

3、方法3

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => {line.split(" ")})
	.map(data => (data, 1))
	.groupByKey()
	.map(data=>(data._1, data._2.size))
	.foreach(println)
    sc.stop()

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

4、方法4

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.aggregateByKey(0)(_ + _, _ + _)
    .foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

5、方法5

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.foldByKey(0)( _ + _)
	.foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

6、方法6

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y)=>(x+y))
    .foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

7、方法7

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
   val stringToLong: collection.Map[String, Long] = rdd.map(data => (data, 1)).countByKey()
    println(stringToLong)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

8、方法8

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
   val stringToLong: collection.Map[String, Long] = rdd.countByValue()
    println(stringToLong)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

9、方法9

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
    val RDD1: RDD[mutable.Map[String, Long]] = rdd.map(word => mutable.Map[String, Long]((word, 1L)))
    val stringToInt: mutable.Map[String, Long] = RDD1.reduce((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Long = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
      }
      map1
    }
    )
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

10、方法10

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
    val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))

    val stringToInt: mutable.Map[String, Int] = RDD1.aggregate(mutable.Map[String, Int]())((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    }, (map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    })
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

11、方法11

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings})
    val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))
    val stringToInt: mutable.Map[String, Int] = RDD1.fold(mutable.Map[String, Int]())((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    })
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

好了,上面就是我们的11种实现方法,那现在我们来做一个总结吧。
大家可能注意到了,方法6及以前都是单个输出,但是方法7以及以后j结果都是Map?是巧合吗?还是道德的缺失?

好了不扯了,主要是因为实现WordCount的主要算子不一样。前面都是Transformation (转换)算子,后面是Action (行动)算子。
来,瞧瞧吧。

Transformation 是惰性算子,待需要的时候执行,Action 是活动算子,直接生成任务执行。一个Action 对应着一个任务。

算子类型实现主要算子算子简介
TransformationgroupByvalue数据类型分组算子,使用需要指定分组值,返回值是二元组(k,迭代器)
TransformationgroupByKeyKey-Value数据类型分组算子,不需要指定分组数据,直接按照k分组。返回值是二元组(k,迭代器)
TransformationreduceByKeyKey-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间函数和分区类函数一样)
TransformationaggregateByKeyKey-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间和分区类函数不一样)和聚合初始值,柯里化
TransformationfoldByKey分组聚合类似于aggregateByKey 只是foldByKey可以表示分区间和分区类的计算逻辑是一样的,柯里化
TransformationcombineByKeyKey-Value数据类型分组聚合算子,有三个参数第一个参数:对第一个数据做修饰,第二个参数:分区内聚合函数, 第三个参数:分区间聚合函数,中间变量的类型有可能编译没办法识别,需要标明泛型
ActioncountByKey按照key值进行分组聚合,底层调用的是reduceByKey(+)
ActioncountByValue直接聚合,底层调用countByKey ,map(value => (value, null)).countByKey()
Actionreduce聚合算子,底层需要自己去实现,存在的价值是自定义底层。见上面的用法即明白
Actionfold比reduce算子,多一个参数,可以设置聚合时中间临时变量的初始值]
Actionaggregate可以执行分区间聚合和分区类聚合,比如fold多一个参数,分别设置RDD数据集合时局部聚合函数和全局聚合函数


以上只是算子的简单介绍,后面我们会对其原理以及源码进行说明。这些简介配合用法大家先看着理解哈,各位看客,怠慢了。

标签:SparkContext,String,val,WordCount,之源,sc,new,spark,data
来源: https://blog.csdn.net/weixin_45025143/article/details/121479412

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有