ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark集群的三种模式

2022-02-08 20:29:59  阅读:190  来源: 互联网

标签:-- sh 三种 examples org 集群 Spark spark


文章目录

1、Spark的由来

  • 定义:Hadoop主要解决,海量数据的存储和海量数据的分析计算。Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

1.1 Hadoop的发展

  • Hadoop1.x存在的问题:
    • NameNode不能高可用
    • MR框架中资源调度和任务调度耦合
    • MR基于磁盘计算,效率低
  • Hadoop2.x对应的解决了以上几个问题
    • NameNode高可用
    • 将资源调度和任务调度解耦
    • 计算框架可插拔

Spark框架诞生早于Yarn,所以Spark自己设计了一套资源调度框架。

1.2 MapReduce与Spark对比

MR不适合迭代计算

在这里插入图片描述

Spark支持迭代计算和图形计算:因为Spark中间结果不落盘。但是Shuffle也得落盘。

在这里插入图片描述

2、Spark内置模块

在这里插入图片描述

Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。

Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。

Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。

Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。

Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。

集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器

3、Spark运行模式

  • Local模式:本地调试

  • Standalone模式:Spark自带的任务调度模式

  • Yarn模式:使用Yarn进行资源调度和任务调度

3.1 Standalone模式部署

  • 集群规划
hadoop102hadoop103hadoop104
SparkMaster、WorkerWorkerWorker

具体步骤:

  1. 解压安装包至指定目录

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
    mv spark-3.0.0-bin-hadoop3.2/ spark-standalone
    
  2. 修改配置文件

    slaves

    hadoop102
    hadoop103
    hadoop104
    

    spark-env.sh

    SPARK_MASTER_HOST=hadoop102
    SPARK_MASTER_PORT=7077
    
  3. 分发spark-standalone

    xsync spark-standalone/
    
  4. 在hadoop102上启动集群

    sbin/start-all.sh
    
  5. jps查看启动情况

  6. 测试

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077 \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

    参数含义

    参数解释可选值举例
    –classSpark程序中包含主函数的类
    –masterSpark程序运行的模式本地模式:local[*]、spark://hadoop102:7077、Yarn
    –executor-memory 1G指定每个executor可用内存为1G符合集群内存配置即可,具体情况具体分析。
    –total-executor-cores 2指定所有executor使用的cpu核数为2个
    application-jar打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
    application-arguments传给main()方法的参数

配置历史服务器

  1. 修改配置文件

    spark-defaults.conf

    spark.eventLog.enabled          true
    spark.eventLog.dir              hdfs://hadoop102:8020/directory
    

    spark-env.sh

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080 
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
    -Dspark.history.retainedApplications=30"
    

    # 参数1含义:WEBUI访问的端口号为18080

    # 参数2含义:指定历史服务器日志存储路径(读)

    # 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  2. 重新分发修改的配置文件

    xsync spark-defaults.conf spark-env.sh
    
  3. 重启spark及其历史服务器

    sbin/stop-history-server.sh
    sbin/stop-all.sh
    
    sbin/start-all.sh
    sbin/start-history-server.sh
    
  4. 查看Spark历史服务器hadoop102:18080

配置高可用

  1. 停止集群

  2. 启动ZooKeeper

  3. 修改配置文件

    spark-env.sh

    #注释掉如下内容:
    #SPARK_MASTER_HOST=hadoop102
    #SPARK_MASTER_PORT=7077
    
    #添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:
    export SPARK_DAEMON_JAVA_OPTS="
    -Dspark.deploy.recoveryMode=ZOOKEEPER 
    -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 
    -Dspark.deploy.zookeeper.dir=/spark"
    
    #添加如下代码
    #Zookeeper3.5的AdminServer默认端口是8080,和Spark的WebUI冲突
    export SPARK_MASTER_WEBUI_PORT=8989
    
  4. 重新分发修改后的文件

    xsync spark-env.sh
    
  5. 重启集群

    sbin/start-all.sh
    sbin/start-history-server.sh
    
  6. 在hadoop103上启动master

    sbin/start-master.sh
    
  7. 通过hadoop103:8989访问测试

  8. 通过jps查看进程状态

  9. kill掉hadoop102的master进程测试

Spark HA集群访问

bin/spark-shell \
--master spark://hadoop102:7077,hadoop103:7077 \
--executor-memory 2g \
--total-executor-cores 2

参数:–master spark://hadoop102:7077指定要连接的集群的master

注:一旦配置了高可用以后,master后面要连接多个master

运行模式

根据Driver程序的运行位置分为如下两种模式

  • standalone-client(默认模式)

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077,hadoop103:7077 \
    --executor-memory 2G \
    --total-executor-cores 2 \
    --deploy-mode client \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
  • standalone-cluster

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077,hadoop103:7077 \
    --executor-memory 2G \
    --total-executor-cores 2 \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

客户端模式的计算结果将打印在本地,集群模式只能在web页面中找到

在这里插入图片描述

3.2 Yarn模式安装部署

  1. 解压安装包到指定位置

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
    mv spark-3.0.0-bin-hadoop3.2/ spark-yarn
    
  2. 修改配置文件

    修改hadoop中的yarn-site.xml,分发并重启hadoop集群

    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
    <property>
         <name>yarn.nodemanager.pmem-check-enabled</name>
         <value>false</value>
    </property>
    
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
    <property>
         <name>yarn.nodemanager.vmem-check-enabled</name>
         <value>false</value>
    </property>
    

    spark-env.sh

    # 修改/opt/module/spark-yarn/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径
    YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
    
  3. 启动spark集群并测试

    sbin/start-history-server.sh
    sbin/start-all.sh
    
    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

配置历史服务器

  1. 修改配置文件

    spark-defaults.conf

    #写
    spark.eventLog.enabled          true
    spark.eventLog.dir              hdfs://hadoop102:8020/directory
    
    #读
    spark.yarn.historyServer.address=hadoop102:18080
    spark.history.ui.port=18080
    

    spark-env.sh

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080 
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
    -Dspark.history.retainedApplications=30"
    

    # 参数1含义:WEBUI访问的端口号为18080

    # 参数2含义:指定历史服务器日志存储路径(读)

    # 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  2. 重新分发修改的配置文件

    xsync spark-defaults.conf spark-env.sh
    
  3. 重启spark及其历史服务器

    sbin/stop-history-server.sh
    sbin/stop-all.sh
    
    sbin/start-all.sh
    sbin/start-history-server.sh
    
  4. 提交任务到Yarn

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
  5. Web页面查看日志:http://hadoop103:8088/cluster,点击history进入hadoop102:18080

运行模式

Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster,适用于生产环境。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

如果在yarn日志端无法查看到具体的日志,则在yarn-site.xml中添加如下配置并启动Yarn历史服务器

4、WordCount案例

  1. Maven依赖和scala打包插件

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
    
    <build>
    	<finalName>WordCount</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                       <goals>
                          <goal>compile</goal>
                          <goal>testCompile</goal>
                       </goals>
                    </execution>
                 </executions>
            </plugin>
            
            
            <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
        </plugins>
    </build>
    
  2. 代码

    package com.atguigu.spark
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
        def main(args: Array[String]): Unit = {
    
            //1.创建SparkConf并设置App名称
            val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
    
            //2.创建SparkContext,该对象是提交Spark App的入口
            val sc = new SparkContext(conf)
    
            //3.读取指定位置文件:hello atguigu atguigu
            val lineRdd: RDD[String] = sc.textFile("input")
    
            //4.读取的一行一行的数据分解成一个一个的单词(扁平化)(hello)(atguigu)(atguigu)
            val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
    
            //5. 将数据转换结构:(hello,1)(atguigu,1)(atguigu,1)
            val wordToOneRdd: RDD[(String, Int)] = wordRdd.map(word => (word, 1))
    
            //6.将转换结构后的数据进行聚合处理 atguigu:1、1 =》1+1  (atguigu,2)
            val wordToSumRdd: RDD[(String, Int)] = wordToOneRdd.reduceByKey((v1, v2) => v1 + v2)
    
            //7.将统计结果采集到控制台打印
            val wordToCountArray: Array[(String, Int)] = wordToSumRdd.collect()
            wordToCountArray.foreach(println)
    
            //一行搞定
            //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(args(1))
    
            //8.关闭连接
            sc.stop()
        }
    }
    
  3. log4j.properties

    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
    # Set the default spark-shell log level to ERROR. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=ERROR
    
    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=ERROR
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
    log4j.logger.org.apache.parquet=ERROR
    log4j.logger.parquet=ERROR
    
    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
    

标签:--,sh,三种,examples,org,集群,Spark,spark
来源: https://blog.csdn.net/qq_36593748/article/details/122830663

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有