ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark基础

2021-11-19 11:01:18  阅读:91  来源: 互联网

标签:task rdd 分区 基础 RDD sc Spark spark


Spark基础

一.spark介绍

1.1 spark特点

  • 1.快速高效 :spark可以将中间结果cache到内存中,节省了网络IO和磁盘IO。同时spark使用了dag任务调度思想,将计算逻辑构成了一个有向无环图,同时也会将dag优化后再生成物理计划,所以性能比mapreduce好很多。
  • 2.简洁易用,spark支持scala,java,python,r等语言操作。
  • 3.提供了统一的大数据解决方案。Spark还支持SQL,大大降低了大数据开发者的使用门槛,同时提供了SparkStream和Structed Streaming可以处理实时流数据;MLlib机器学习库,提供机器学习相关的统计、分类、回归等领域的多种算法实现。其高度封装的API
    接口大大降低了用户的学习成本;Spark
    GraghX提供分布式图计算处理能力;PySpark支持Python编写Spark程序;SparkR支持R语言编写Spark程序。
  • 4.支持多种部署方案,1.standalone,spark on yarn, spark on mesos
  • 5.支持多种数据源,hdfs,hbase,hive,alluxio以及任何和hadoop相兼容的。

1.2 spark跟mapreduce优缺点对比

  • 1.mr:mr只能做离线计算,如果实现复杂逻辑,一个mr解决不了,需要将多个mr按照顺序串联计算,然后每一个mr的结果存储在hdfs中,写一个mr将上一个mr的输出结果作为输入,这样就要频繁读写hdfs,网络io和磁盘io是性能瓶颈,从而效能低下。
  • 2.spark:既可以做离线计算,也能做实时计算。提供了抽象的数据集(RDD,Dataset,dataframe,DStream),有高度封装的API,算子丰富,同时使用了更先进的DAG有向无环图,可以优化后再执行执行计划,并且数据可以cache再内粗中,
  • 3.mr的task是以map task, reduce这样的进程级别实例组成,spark的worker,executor也是进程级别实例,但是分配到具体任务的时候,mr还是进程实例,但是spark处理任务的单位task是运行在executor的线程,是多线程级别。

1.3 3.0版本新特性

1.改进spark sql,在运行时对查询计划优化,允许spark planner在允许时执行这些计划,这些计划将在运行时执行可选的计划。计划将基于运行时统计数据优化,提高性能
● 动态合并shuffle partitions
● 动态调整join策略
● 动态优化倾斜的join
● 动态分区裁剪
● ANSI SQL兼容性
● Join hints

1.4

基本概念:

  • 1.application,表示你的应用程序
  • 2.driver,表示main()函数,创建spark context。由spark context 负责和cluster manager通信,进行资源的调度。程序执行完毕关闭spark context.
  • 3.executor,某个application 运行在worker节点的一个进程,该进程负责运行某些task,负责将数据存在磁盘或者内存中。负责将task包装成task
    runner,并从线程池抽一个空闲线程运行task,每一个executor最多能运行的task数量取决于CPU的核数。
  • 4.work, 集群中运行application的节点。
  • 5.task,每个executor进程中执行任务的工作单元,多个task组成stage。
  • 6.stage,又叫task set,由多个task组成
  • 7.job,包括多个task set组成的并行计算,是又action行为触发,
  • 8.dag scheduler。基于job构建基于stage的dag,并提交stage给task scheduler。靠rdd之间的依赖关系划分stage.
  • 9.task scheduler。将stage提交给worker节点运行,每个executor执行什么task就是在这里来。

1.5 spark流程

在这里插入图片描述

  • 1.构建spark application环境,启动spark context。 spark context 向资源管理器(standalone或者yarn)注册于申请executor资源。
  • 2.资源管理器分配executor资源,同时启动standaloneexecutorbackend。executor会将运行状态汇报给资源管理器。
  • 3.executor香spark context申请task。
  • 4.spark context将程序代码构建dag有向无环图。然后dag scheduler将dag分解为stage,分配stage给task scheduler。
  • 5.executor向task scheduler申请 task。
  • 6.task scheduler分配task给executor,同时sparkcontext将应用程序代码发给executor。
  • 7.Task在Executor上运行,运行完毕释放所有资源。

二、RDD的使用

1.1 什么是RDD

rdd是一个弹性的分布式数据集,是spark里面最基础的抽象。RDD是一个不可变的、有多分区的、可以并行计算的集合。rdd不装真正要计算的数据,只存放着数据的描述信息。例如从哪里读数据,调用了什么方法,传了什么函数,以及依赖关系等等。

1.2 rdd的特点

  • 1.rdd之间存在依赖关系。RDD是不可改变的,RDD只能通过transformation生成一个新的rdd,子rdd会记录父RDD的一些依赖关系,包括宽依赖和窄依赖
  • 2.rdd存在分区:分区标号一般从0开始,分区的数量决定了对应阶段的task并行度。
  • 3.函数作用在每个分区上,每个分区都生成一个task,对该分区进行计算,整个函数就是具体的计算逻辑。
  • 4.kvrdd在shuffle的时候会有分区器,默认使用hashpartitioner。
  • 5.如果是从hdfs取数据,rdd会从namenode获取数据地址,移动计算,并不是移动数据,可以提高计算效率。

宽依赖和窄依赖
父RDD中的partition和子RDD的partition。如果关系是一对多(父rdd的一个partition数据对应子rdd的多个partition数据),就是宽依赖;父RDD中的partition和子RDD的partition如果是一对一或多对一,就是窄依赖

简单来说,就是,父RDD一个ID分区有1和2两个ID,子RDD只获取了父RDD的1或者2,这就是宽依赖,如果子RDD将1和2都获取了,就是窄依赖

1.3 rdd分类

  • 1.transformation 转换算子,调用转换算子生成新的rdd。transformation 是lazy惰性计算,不触发job执行
  • 2.action 行动算子,调用行动算子会触发job执行,本质上是调用了sc.runjob()方法。该方法会从最后一个rdd,根据依赖关系,从后往前,划分stage,生成taskset。

1.4 创建rdd方式

  • 1.利用parallelize转换集合为rdd val rdd1 = sc.parallelize(Array(1,2,3,4))
  • 2.从指定目录读取文件 val line = sc.textFile("hdfs://192.x.x.x:9000/log")

1.5 常用transformation算子

  • 1.map 单元素映射 val rdd1 = sc.parallelize(Array(1,2,3,4)).map(_*2)
  • 2.flatmap() 先map映射,再压平 val rdd_fm = sc.parallelize(Array(“a b c”,“d e f”)).flatmap(_.split(’ ')).collect();
  • 3.filter 过滤 val rdd_f = sc.parallelize(Array(1,2,3,4)).filter(_%2==0)
  • 4.mapPartitions() 效果跟map一样,但是map是针对单个元素,mapPartitions是针对整个分区的。例如一个分区要处理1w条数据,那么map的fun()就要计算1w次,但是mapPartitions是把1w条数据接收了,再计算1次就行。
   val rdd_mp = sc.parallelize(Array(1,2,3,4)).mapPartitions(it =>
   it.map(x => x*10))
  • 5.mapPartitionsWithIndex,类似于mapPartitions, 不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。●
    mapPartitionsWithIndex,类似于mapPartitions,
    不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。
  • 6.sortBy算子,排序
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x,true)
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x+"",true)
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x.toString,true)
    7.sortByKey算子,排序
    val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.sortByKey()
  • 8.groupBy算子,按照key分组
    val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.groupBy(_._1)
  • 9.groupByKey 按照key进行 val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd2: RDD[(String,
    Iterable[Int])] = rdd1.groupByKey()
  • 10.reduceByKey val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd3 = rdd1.reduceByKey(+)
  • 11.distinct算子,去重 sc.parallelize(List(5,5,6,6,7,8,8,8)).distinct.collect
  • 12.union算子,并集,两个RDD类型要一样 val rdd6 = sc.parallelize(List(5,6,4,7), 2)val rdd7 = sc.parallelize(List(1,2,3,4), 3)val rdd8 =
    rdd6.union(rdd7)

1.6 常见action 算子

  • 1.collect,将数据以数组形式收集回Driver端,数据按照分区编号有序返回。同时会从远程集群是拉取数据到driver端。 val rdd1 = sc.parallelize(List(1,2,3,4,5), 2) rdd1.collect
  • 2.reduce,将数据以输入的函数进行聚合返回一个值

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
val r = rdd1.reduce(+)

  • 3.count,返回rdd元素的数量

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c = rdd1.count

  • 4.top将RDD中数据按照降序或者指定的排序规则,返回前n个元素

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c: Array[Int] = rdd1.top(2)

  • 5.take,返回一个由数据集的前n个元素组成的数组

val rdd1 = sc.parallelize(List(3,2,4,1,5), 2)
var c: Array[Int] = rdd1.take(2)

  • 6.saveAsTextFile以文本的形式保存到文件系统中

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1.saveAsTextFile(“hdfs://node-1.51doit.cn:9000/out2”)

1.7 缓存算子 cache、persis

cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist。

1.8 checkpoint算子 下载中间结果到hdfs

checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算

1.9 rdd分析

sc.textFile(args(0))
.flatMap(.split(" "))
.map((
, 1))
.reduceByKey(+)
.saveAsTextFile(args(1))

  • 1.假如读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区都是RDD,所以有两个rdd
  • 2.在调用reduceByKey方法时,有shuffle产生,要划分Stage,所有有两个Stage
  • 3.第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task

1.10 累加器(只写不读)、广播变量(只读不写)

  • 广播变量:Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
    这样理解,
    一个worker中的executor,有5个task运行,假如5个task都需要这从份共享数据,就需要向5个task都传递这一份数据,那就十分浪费网络资源和内存资源了。使用了广播变量后,只需要向该worker传递一次就可以了。
  • 累加器:一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值
    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。非常类似于在MR中的一个Counter计数器,主要用于统计各个程序片段被调用的次数,和整体进行比较,来对数据进行一个评估。需要注意的是,累加器的执行必须需要Action触发。

1.11 coalesce 和repartition 重新分区

  • 1.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 如果不写true/false 默认是false
  • 2.repartition实际上是调用的coalesce,一定會进行shuffle的。repartition 底层也是调用coalesce的

假设RDD有N个分区,需要重新划分成M个分区:

  • N < M:
    一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。因为重分区前后相当于宽依赖,会发生shuffle过程,此时可以使用coalesce(shuffle=true),或者直接使用repartition()。
  • 如果N > M并且N和M相差不多(假如N是1000,M是100):
    那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这是前后是窄依赖关系,可以使用coalesce(shuffle=false)。
  • 如果 N> M并且两者相差悬殊:
    这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

标签:task,rdd,分区,基础,RDD,sc,Spark,spark
来源: https://blog.csdn.net/weixin_43859562/article/details/121416657

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有