ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

meituan交互式系统浅析(3) sparkSQL数据倾斜解决

2021-06-16 13:03:03  阅读:185  来源: 互联网

标签:count word field sparkSQL sql meituan 浅析 select sqlContext


对于在开发过程中可能出现的数据倾斜问题,可提供一种利用双重group by的方法来解决。

分析:

 可以使用类似于SparkCore中解决数据倾斜,提高的两阶段聚合(局部+全局)
 局部——随机打散+前缀,通过groupBy完成局部统计
  全局——去掉前缀,通过groupBy完成全局统计
object _05SparkSQLOptimizationOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf = new SparkConf().setMaster("local[2]").setAppName(s"${_05SparkSQLOptimizationOps.getClass.getSimpleName}")

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        //注册自定义的函数
        sqlContext.udf.register[String, String, Int]("addRandomPrefix", (field, num) => addRandomPrefix(field, num))
        sqlContext.udf.register[String, String]("removePrefix", field => removePrefix(field))

        val df = sqlContext.read.text("E:/data/hello.log").toDF("line")
//        df.show()
        //sql的方式
        df.registerTempTable("test")
//        groupByOps1(sqlContext)
        //1、添加前缀
                sqlContext.sql("select " +
                                    "addRandomPrefix(w.word, 2) as p_word " +
                              "from (" +
                                "select " +
                                    "explode(split(line, ' ')) as word " +
                                "from test" +
                              ") w").show()
        //2、局部统计
                sqlContext.sql("select " +
                                  "p.p_word," +
                                  "count(p.p_word) as p_count " +
                               "from (" +
                                    "select " +
                                        "addRandomPrefix(w.word, 2) as p_word " +
                                    "from (" +
                                        "select " +
                                            "explode(split(line, ' ')) as word " +
                                        "from test" +
                                    ") w" +
                               ") p " +
                              "group by p.p_word").show()
        //3、干掉前缀
                sqlContext.sql("select " +
                                  "removePrefix(p.p_word) as r_word," +
                                  "count(p.p_word) as r_count " +
                               "from (" +
                                    "select " +
                                        "addRandomPrefix(w.word, 2) as p_word " +
                                    "from (" +
                                        "select " +
                                            "explode(split(line, ' ')) as word " +
                                        "from test" +
                                    ") w" +
                               ") p " +
                              "group by p.p_word").show()
        //4、全局统计

        sqlContext.sql("select " +
                            "r.r_word as field, " +
                            "sum(r.r_count) as sum " +
                       "from (" +
                            "select " +
                              "removePrefix(p.p_word) as r_word," +
                              "count(p.p_word) as r_count " +
                            "from (" +
                                "select " +
                                    "addRandomPrefix(w.word, 2) as p_word " +
                                "from (" +
                                    "select " +
                                        "explode(split(line, ' ')) as word " +
                                    "from test" +
                                ") w" +
                            ") p " +
                           "group by p.p_word" +
                       ") r " +
                       "group by r.r_word").show()
        sc.stop()
    }

    private def groupByOps1(sqlContext: SQLContext) = {
        //拆分
        sqlContext.sql("select explode(split(line, ' ')) as word from test")
            .registerTempTable("word_tmp")
        //添加前缀
        sqlContext.sql("select addRandomPrefix(word, 2) as p_word from word_tmp")
            .registerTempTable("prefix_word_tmp")
        //局部聚合
        sqlContext.sql("select p_word, count(p_word) as p_count from prefix_word_tmp group by p_word")
            .registerTempTable("prefix_count_word_tmp")
        //去掉前缀
        sqlContext.sql("select removePrefix(p_word) as r_word, p_count as r_count from prefix_count_word_tmp")
            .registerTempTable("r_prefix_count_word_tmp")
        //全局聚合
        sqlContext.sql("select r_word, sum(r_count) r_sum from r_prefix_count_word_tmp group by r_word").show()
    }

    /**
      * 添加随机前缀
      *
      * @param field
      * @param num  [0, num)
      * @return  num_field
      */
    def addRandomPrefix(field:String, num:Int):String = {
        val random = new Random()
        val prefix = random.nextInt(num)
        prefix + "_" + field
    }

    /**
      * 去掉随机前缀
      * @param field
      * @return
      */
    def removePrefix(field:String):String = field.split("_")(1)
}

标签:count,word,field,sparkSQL,sql,meituan,浅析,select,sqlContext
来源: https://blog.csdn.net/KujyouRuri/article/details/117954533

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有