ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

meituan交互式系统浅析(2) product,统计区域热门商品

2021-06-16 12:31:51  阅读:152  来源: 互联网

标签:product val area aura meituan import com 浅析


DAO层的基础建设:

ProductAreaTop3DaoImpl 用于插入或者更新区域热门商品的数据

package com.aura.bigdata.analysis.dao.impl.product;

import com.aura.bigdata.analysis.dao.product.IProductAreaTop3Dao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;
import com.aura.bigdata.analysis.util.DBCPUtils;
import org.apache.commons.dbutils.QueryRunner;

import java.sql.SQLException;
import java.util.List;

public class ProductAreaTop3DaoImpl implements IProductAreaTop3Dao {
    private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());

    String sql = "INSERT INTO product_area_top3 VALUES(?, ?, ?, ?, ?, ?, ?)";

    @Override
    public void insert(ProductAreaTop3 entity) {

        try {
            qr.update(sql, entity.getTaskId(), entity.getArea(),
                           entity.getAreaLevel(), entity.getProductId(),
                           entity.getProductName(), entity.getClickCount(),
                           entity.getProductStatus());
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void insertBatch(List<ProductAreaTop3> list) {
        Object[][] params = new Object[list.size()][];
        for (int i = 0; i < list.size(); i++) {
            ProductAreaTop3 entity = list.get(i);
            Object[] obj = {
                    entity.getTaskId(), entity.getArea(),
                    entity.getAreaLevel(), entity.getProductId(),
                    entity.getProductName(), entity.getClickCount(),
                    entity.getProductStatus()
            };
            params[i] = obj;
        }
        try {
            qr.batch(sql, params);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

与之匹配的实现接口:


package com.aura.bigdata.analysis.dao.product;

import com.aura.bigdata.analysis.dao.IBaseDao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;

public interface IProductAreaTop3Dao extends IBaseDao<ProductAreaTop3> {
}



结果储存到如下表中:

CREATE TABLE IF NOT EXISTS `product_area_top3` (
  task_id int,
  area varchar(20),
  area_level varchar(20),
  product_id bigint,
  product_name varchar(50),
  click_count bigint,
  product_status varchar(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

开始编辑spark作业

首先要加载区域信息
package com.aura.bigdata.analysis.jobs.product

import java.util
import java.util.{Date, Properties}

import com.alibaba.fastjson.{JSON, JSONObject}
import com.aura.bigdata.analysis.accumulators.SessionAggrInfoAccumulator
import com.aura.bigdata.analysis.conf.ConfigurationManager
import com.aura.bigdata.analysis.constants.Constants
import com.aura.bigdata.analysis.dao.impl.TaskDaoImpl
import com.aura.bigdata.analysis.dao.impl.product.ProductAreaTop3DaoImpl
import com.aura.bigdata.analysis.dao.impl.session.SessionAggrStatDaoImpl
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3
import com.aura.bigdata.analysis.domain.session.SessionAggrStat
import com.aura.bigdata.analysis.jobs.session.{CategorySort, UserSessionAggrStatAnalysisApp}
import com.aura.bigdata.analysis.jobs.session.UserSessionAggrStatAnalysisApp.{aggregateInfos, getRangeSession}
import com.aura.bigdata.analysis.mock.MockData
import com.aura.bigdata.analysis.udf.UDFUtils
import com.aura.bigdata.analysis.util._
import com.aura.bigdata.analysis.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

/**
  * 需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品
  * Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。
  */
object ProductAreaTopJob {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

        //1、spark 信息
        val conf = new SparkConf()
                          .setMaster("local")
            .setAppName(s"${UserSessionAggrStatAnalysisApp.getClass.getSimpleName}")
        SparkUtils.setMaster(conf, ConfigurationManager.dMode)

        //设置其他相关的参数 比如kryo
        val sc = new SparkContext(conf)
        val sqlContext = SparkUtils.getSQLContext(sc, ConfigurationManager.dMode, isHive = true)
        //注册
        sqlContext.udf.register[String, String]("getJsonInfo", str => UDFUtils.getJsonInfo(str))


        //2、task id读取 task作业
        val taskId = ConfigurationManager.getIntProperty(Constants.SPARK_JOB_SESSION_TASK_ID)
        val taskDao = new TaskDaoImpl
        val task = taskDao.getTaskById(taskId)
        if(task == null) {
            System.err.println(s"taskId:${taskId}找不到,请确认参数是否正确")
            System.exit(-1)
        }
        val params = JSON.parseObject(task.getTask_param)
        //        println("params: " + params)
        //3、加载数据
        MockData.mock(sc, sqlContext)
        //4、按条件筛选session
        getRangeSession(params, sqlContext)
        /**
          * 5、加载区域信息---->mysql中的cities表
          */
        loadAreaInfo(sqlContext)

        /**
          * 6、关联相关数据
          *     city_info、product_click_city、product_info
          */
        joinProductInfo(sqlContext)

        /**
          * 7、聚合统计,计算每一个区域下面的商品点击数量
          */
        calcAreaProduct(sqlContext)

        /**
          * 8、最终结果分组topN获取
          */
        calcAreaProductTop3(sqlContext, taskId)

        /**
          * 9、入库
          */
        export2MySQL(taskId, sqlContext)
    }

    def export2MySQL(taskId:Int, sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
          "taskId,"+
          "area, " +
          "area_level, " +
          "product_id, " +
          "product_name, " +
          "product_status, " +
          "count, " +
          "ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
          "FROM area_product_click_count " +
          "HAVING rank < 4"
        val df = sqlContext.sql(sql)




        df.rdd.foreachPartition(partition => {
            if(!partition.isEmpty) {
                val patDao = new ProductAreaTop3DaoImpl
                val list = new util.ArrayList[ProductAreaTop3]()
                for (row <- partition) {
                    val pat = new ProductAreaTop3()
                    pat.setTaskId(taskId)
                    pat.setArea(row.getAs[String]("area"))
                    pat.setAreaLevel(row.getAs[String]("area_level"))
                    pat.setClickCount(row.getAs[Long]("count"))
                    pat.setProductId(row.getAs[Long]("product_id"))
                    pat.setProductName(row.getAs[String]("product_name"))
                    pat.setProductStatus(row.getAs[String]("product_status"))
                    list.add(pat)
                }
                patDao.insertBatch(list)
            }
        })
    }

    /**
      * 8、最终结果分组topN获取
      * 开窗函数row_number()
      */
    def calcAreaProductTop3(sqlContext:SQLContext, taskId:Int){
        val sql = "SELECT " +
                     "area, " +
                     "area_level, " +
                     "product_id, " +
                     "product_name, " +
                     "product_status, " +
                     "count, " +
                     "ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
                  "FROM area_product_click_count " +
                  "HAVING rank < 4"
        val df = sqlContext.sql(sql)
        df.registerTempTable("area_product_top3_temp")
        df.show()




    }

    def calcAreaProduct(sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
                    "area, " +
                    "area_level, " +
                    "product_id, " +
                    "product_name, " +
                    "product_status, " +
                    "count(product_id) as count " +
                  "FROM area_product_click_tmp " +
                  "GROUP BY area, product_id, area_level, product_name, product_status"
        val df = sqlContext.sql(sql)
//        df.show()
        df.registerTempTable("area_product_click_count")
    }
    /**
      * 将三张表的内容关联在一起
      *     city_info、product_click_city、product_info
      * 结果
      *  task_id、area、area_level、product_id、city_names、click_count、product_name、product_status
      *  比如区域级别分为了这个几个级别
      *  AAAAA  ---> 华东|华北|华南
      *  AAAA   ---> 华中
      *  AAA    ---> 西南
      *  AA     ---> 东北
      *  A      ---> 西北
      * @param sqlContext
      */
    def joinProductInfo(sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
                     "ci.area, " +
                     "CASE " +
                        "WHEN ci.area = '华东' OR ci.area = '华北' OR ci.area = '华南' THEN 'AAAAA' " +
                        "WHEN ci.area = '华中' THEN 'AAAA' " +
                        "WHEN ci.area = '西南' THEN 'AAA' " +
                        "WHEN ci.area = '东北' THEN 'AA' " +
                        "ELSE 'A' " +
                     "END area_level, " +
                     "pi.product_id, " +
                     "pi.product_name, " +
                     "if(getJsonInfo(pi.extend_info) = '0', '自营', '第三方') product_status " +
                  "FROM product_click_city pcc " +
                  "LEFT JOIN city_info ci on pcc.city_id = ci.city_id " +
                  "LEFT JOIN product_info pi on pcc.click_product_id = pi.product_id "
        val df = sqlContext.sql(sql)
        df.registerTempTable("area_product_click_tmp")
        df.show()
    }

    def loadAreaInfo(sqlContext:SQLContext): Unit = {
        val url = DBCPUtils.url
        val username = DBCPUtils.username
        val password = DBCPUtils.password
        val properties = new Properties()
        properties.put("user", username)
        properties.put("password", password)

        val df = sqlContext.read.jdbc(url, "cities", properties)
      df.show()
        df.registerTempTable("city_info")        //将表注册进数据库中
    }


    /**
      * 基于运营产品等提交的参数,过滤出session信息
      * @param params {"startAge":[20], "endAge":[50], "startDate":["2018-08-13"], "endDate":["2018-08-13"]}
      * @param sqlContext
      *
      */
    def getRangeSession(params:JSONObject, sqlContext: SQLContext) = {
        val startDate = ParamUtils.getParam(params, Constants.PARAM_START_DATE)
        val endDate = ParamUtils.getParam(params, Constants.PARAM_END_DATE)
        val sql = "SELECT " +
            "click_product_id, " +
            "city_id " +
            "FROM user_visit_action " +
            "WHERE `date` >= '" + startDate + "' " +
            "AND `date` <= '" + endDate + "' " +
            "AND click_product_id is not null";
        val df = sqlContext.sql(sql)
        df.registerTempTable("product_click_city")
     df.show()
    }
}

在这个spark作业中,由于spark不支持row窗口函数,sparksql无法运行,则需要切换成hivesql来运行
需要编写以下sparkutil类

package com.aura.bigdata.analysis.utils

import com.aura.bigdata.analysis.constants.DeployMode
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {


    def setMaster(conf:SparkConf, dMode:DeployMode) {
        if(dMode.equals(DeployMode.DEV)) {
            conf.setMaster("local[4]")
        }
    }
    def getSQLContext(sc: SparkContext, dMode: DeployMode, isHive:Boolean):SQLContext = {
        if(dMode.equals(DeployMode.DEV)) {
            if(isHive) {
                new HiveContext(sc)
            } else {
                new SQLContext(sc)
            }
        } else {
            new HiveContext(sc)
        }
    }
}

标签:product,val,area,aura,meituan,import,com,浅析
来源: https://blog.csdn.net/KujyouRuri/article/details/117953828

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有