ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl

2022-08-08 01:01:59  阅读:251  来源: 互联网

标签:贴源层 -- ods hive dsl dwd import spark ds


6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)

6.1 使用spark dsl 方式处理

6.1.1 注意事项

# 开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
.enableHiveSupport()

# 这下脚本都是作用在dwd层,所以必须在dwd的用户下执行,可能会报权限不够,需要我们申请权限

6.1.2 项目结构如下:

1.是脚本,内容如下:


# 分区
ds=$1

# 执行任务
spark-submit \
--master yarn-client \
--class com.wt.dwd.DwdFcjNwrsSellbargainMskDay \
../target/dwd-1.0-SNAPSHOT.jar \
$ds
# 增加分区
hive -e "alter table dwd.dwd_gsj_reg_investor_msk_d add IF NOT EXISTS  partition (ds='$ds')"

# 注意:
1.如果换行的话,后面必须加上 \ 
2.因为jar包如果不指定路径的话会找不到
3.可以在最后面动态的增加分区,然后再动态的传入变量

2.是在dwd中运行的hive建表语句

-- hive建表语句
-- hive建表语句
CREATE external TABLE IF NOT EXISTS  dwd.dwd_fcj_nwrs_sellbargain_msl_d(
    id STRING comment '身份证号码',
    r_fwzl STRING comment '房产地址',
    htydjzmj STRING comment '合同中约定房子面积',
    tntjzmj STRING comment '房子内建筑面积',
    ftmj STRING comment '房子分摊建筑面积',
    time_tjba STRING comment '商品房备案时间',
    htzj STRING comment '合同总价'
)PARTITIONED BY
(
    ds   STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS textfile
location '/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/';

3,是代码运行的逻辑,主要是处理数据

package com.wt.dwd
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object DwdFcjNwrsSellbargainMskDay {
  def main(args: Array[String]): Unit = {

    /**
     * 1. 创建spark环境
     *
     */
    val spark: SparkSession = SparkSession
      .builder
      //.master("local")
      .enableHiveSupport() //开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    /**
     * 获取时间分区的字段
     *
     */
    val ds: String = args.head
    /**
     * 2. 读取获取购房合同中的表
     * 必须带上库名,否则读不到
     *
     * 不可能读取所有的数据,我们只需要读取每一天的数据
     *
     */
    val sellbargain: DataFrame = spark
      .table("ods.ods_t_fcj_nwrs_sellbargain")
      .where($"ds" === ds)

    //对原始的数据进行托名 对id进行脱敏,然后将r_fwzl中的数字变成 * 号(通过正则表达式替换)
    val resultDF: DataFrame = sellbargain.select(
      upper(md5($"id")) as "id",
      regexp_replace($"r_fwzl", "\\d", "*") as "r_fwzl",
      $"htydjzmj",
      $"tntjzmj",
      $"ftmj",
      $"time_tjba",
      $"htzj"
    )

    resultDF
      .write
      .format("csv")
      .mode(SaveMode.Overwrite)
      .option("sep","\t")
      .save(s"/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/ds=$ds")

    //提交到集群运行 spark-submit --master yarn-client --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay dwd-1.0-SNAPSHOT.jar
  }
}

6.1.3 将通用的东西封装(重要-可以极快的提高效率):

代码逻辑如下:

package com.wt.common
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
abstract class SparkTool extends Logging{
  def main(args: Array[String]): Unit = {

    /**
     * 获取时间分区
     *
     */
    if(args.length == 0){
      logError("请指定分区!!!")
      return
    }
    val ds: String = args.head

    //创建spark环境
    val spark: SparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()

    //调用子类实现的抽象方法
    this.run(spark,ds)
  }

  /**
   * 抽象方法: 在子类中实现这个方法
   * import spark.implicits._
   * import org.apache.spark.sql.functions._
   *
   * @param spark: spark的环境
   * @param ds:分区
   */
  def run(spark: SparkSession,ds: String): Unit

  /**
   * 传入DataFrame 和 path 就可以保存数据
   *
   * 其中的format默认值是 csv格式的。
   *
   */

  def save(dataframe:DataFrame,path:String,format:String = "csv"): Unit={
    dataframe
      .write
      .format("csv")
      .mode(SaveMode.Overwrite)
      .option("sep","\t")
      .save(path)
  }
}

理解:子类继承父类,在父类中已经封装好了spark,DataFormat的save 两个环境,需要子类继承SparkTool ,就可以拿到父类已经创建好的环境,减少代码量,提高效率

在save 中可以设置默认值,如果不传入的话,就使用默认值

而且该工具还吧 ds 给封装好了,我们在使用的时候可以直接用变量传入即可

6.1.4 调用方法如下:

package com.wt.dwd
import com.wt.common.SparkTool
import org.apache.spark.sql.{DataFrame, SparkSession}

object DwdGsjRegLegrepreMskDay extends SparkTool{
  /**
   * 抽象方法: 在子类中实现这个方法
   * import spark.implicits._
   * import org.apache.spark.sql.functions._
   *
   * @param spark  : spark的环境
   * @param ds        :分区
   */
  override def run(spark: SparkSession, ds: String): Unit = {
    import spark.implicits._
    import org.apache.spark.sql.functions._

    /**
     * 读取hive中的表
     *
     */
    val legrepre: DataFrame = spark
      .table("ods.ods_t_gsj_reg_legrepre")
      .where($"ds" === ds)

    val resultDF: DataFrame = legrepre
      .select(
        upper(md5($"id")) as "id",
        $"position",
        upper(md5($"tel")) as "tel",
        $"appounit",
        $"accdside",
        $"posbrmode",
        $"offhfrom",
        $"offhto"
      )

      save(resultDF,s"/daas/motl/dwd/dwd_gsj_reg_legrepre_msk_d/ds=$ds")

  }
}

6.1.5 脚本如下:

因为在common工具和dwd不在同一个模块中,需要在dwd模块中导入common的jar包

需要导入依赖,如下:

脚本如下;

# 分区
ds=$1

# --jars :指定代码需要的其他的包

# 执行任务
spark-submit \
--master yarn-client \
--class com.wt.dwd.DwdGsjRegLegrepreMskDay \
--jars ../lib/common-1.0-SNAPSHOT.jar \
../target/dwd-1.0-SNAPSHOT.jar \
$ds
# 增加分区
hive -e "alter table dwd.dwd_gsj_reg_legrepre_msk_d  add IF NOT EXISTS  partition (ds='$ds')"

将dwd包拖到dwd用户下,执行脚本,最终结果如下

标签:贴源层,--,ods,hive,dsl,dwd,import,spark,ds
来源: https://www.cnblogs.com/atao-BigData/p/16560361.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有