ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

sparksql结果快速到mysql(scala代码、airflow调度)

2022-07-10 00:35:09  阅读:232  来源: 互联网

标签:airflow String val scala mysqlSqlBuilder sparksql apply fieldAndType append


 

经常会有这样的需求:在现有数仓表的基础上,写一些sql,然后生成hive表并同步到mysql。

次数多了,就像写一个工具完成这个工作

一:背景、功能、流程介绍

1.背景:
    1.数仓使用hive存储,datax导数据、airflow调度
    2.不知道怎么利用hive解析sql,拿到对应的schema,但是spark知道 
      spark.sql(sql).schema.toList所以就用了scala
2.功能
    就是通过配置完成hive,mysql的建表,airflow调度任务的生成
3.流程
    1.配置mysql链接
    2.根据输入sparksql,生成对应的hive,mysql表结构,建表
    3.生成airflow调度任务(插入hive数据,调用datax同步数据到mysql)

二:代码

1.配置文件介绍:

MysqlToHive.properties

        jdbcalias:ptx_read    #mysql别名要和同步的数据库的别名保持一致
        table:be_product      #要同步的表名 
        owner=owner              ##airflow任务的owner
        lifecycle=180                  ##hive表的生命周期,数据数据产品删除数据

        airflowpath=/airflow/dags/ods/    ##生成airflow任务文件的路径

        jdbc1alias : hive                 ##可以写多个mysql链接,不用一个来回改
        jdbc1host : 127.0.0.1
        jdbc1port : 3306
        jdbc1user : root
        jdbc1passwd : **
        jdbc1db_name : test

        jdbc2alias:read
        jdbc2host : 127.0.0.1
        jdbc2port : 3306
        jdbc2user : root
        jdbc2passwd :**
        jdbc2db_name :test
2.基本代码:

MysqlToHive.java

      object HiveToMysql {
        //mysql配置内部类
        case class Database(host: String,port: Int,user: String,passwd: String,db_name: String){}

        //读取配置文件
        def readDbPropertiesFile(fileName: String,spark:SparkSession,sql: String): Unit = {
          val pp = new Properties
          val fps = new FileInputStream("HiveToMysql.properties")
      //    val fps = Thread.currentThread.getContextClassLoader.getResourceAsStream(fileName)
          pp.load(fps)
          parseProperties(pp,spark,sql)
          fps.close()
        }
        //解析配置文件对应配置
        def parseProperties(pp: Properties,spark:SparkSession,sql: String): Unit = {
          val table = pp.getProperty("table")
          val owner = pp.getProperty("owner")
          val lifecycle = pp.getProperty("lifecycle")
          val jdbcalias = pp.getProperty("jdbcalias")
          val airflowpath = pp.getProperty("airflowpath")
          import scala.collection.mutable.ArrayBuffer
          var tableColumn: ArrayBuffer[String] = new ArrayBuffer[String]();

          var dbindex = 1
          while (pp.getProperty("jdbc" + dbindex + "alias") != null && !pp.getProperty("jdbc" + dbindex + "alias").equals(jdbcalias)) {
            dbindex += 1
          }

          var database = new Database(pp.getProperty("jdbc" + dbindex + "host"),pp.getProperty("jdbc" + dbindex + "port").toInt,
            pp.getProperty("jdbc" + dbindex + "user"),pp.getProperty("jdbc" + dbindex + "passwd"),pp.getProperty("jdbc" + dbindex + "db_name"))

          val mysqlSelectBuilder = new StringBuilder

          val schemaList = spark.sql(sql).schema.toList
          //sparksql  利用schema生成hive建表语句和mysql建表语句
          for ( i <- 0 until schemaList.length ) {
            println(schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
            tableColumn += (schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
            mysqlSelectBuilder.append(schemaList.apply(i).name+",")
          }
          mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length - 1)

          buildExecuteHiveSql(table,tableColumn,lifecycle,owner)
          buildExecuteMysql(table,tableColumn,database);
          printAirflowJob(airflowpath,table,owner,jdbcalias,mysqlSelectBuilder.toString(),sql: String)
        }

        //airflow封装太多了,就不写了
        def printAirflowJob(airflowpath:String,table:String,owner:String,jdbcalias:String,mysqlSelect:String,sql:String){

          val db = table.substring(0, table.indexOf("."));
          val tableNoDatabase = table.substring(table.indexOf(".") + 1);
          System.out.println(airflowpath +db+"/"+ tableNoDatabase)
          if (new File(airflowpath +db+"/"+ tableNoDatabase).exists())
            System.out.println("folder exist,please delete the folder " + airflowpath +db+"/"+ tableNoDatabase)
          else {
            val dir = new File(airflowpath +db+"/"+ tableNoDatabase);
            dir.mkdirs();
            val pw = new PrintWriter(airflowpath +db+"/"+ tableNoDatabase + "/" + tableNoDatabase + "_dag.py")

            pw.println("import airflow");
            pw.println("from airflow import DAG");
            pw.println(")");
            pw.println("");
            pw.println("");
            pw.flush()
            pw.close()
          }
        }

        @throws[IOException]
        def buildExecuteMysql(table:String,tableColumn:ArrayBuffer[String],database:Database): Unit = {

          val mysqlSqlBuilder = new StringBuilder
          mysqlSqlBuilder.append("CREATE TABLE " + table.substring(table.indexOf(".")+1)+ " ( \n")
          mysqlSqlBuilder.append("dt varchar(10) DEFAULT NULL,"+"\n")
          println("tableColumnCopy"+tableColumn.size)
          val tableColumnCopy = tableColumn.toArray[String];

          for (i <- 0 until tableColumnCopy.size) {
            val fieldAndType = tableColumnCopy.apply(i).split("\\|")

            mysqlSqlBuilder.append(fieldAndType(0)+ " ")
            if (fieldAndType(1).contains("integer") || fieldAndType(1).contains("long"))
              mysqlSqlBuilder.append(" bigint(10)")
            else if (fieldAndType(1).contains("float") || fieldAndType(1).contains("double") || fieldAndType(1).contains("decimal"))
              mysqlSqlBuilder.append(" decimal(36,6)")
            else if (fieldAndType(1).contains("string") )
              mysqlSqlBuilder.append(" varchar(36)")
            else if (fieldAndType(1).contains("boolean") )
              mysqlSqlBuilder.append(" boolean")
            else if (fieldAndType(1).contains("date") || fieldAndType(1).contains("timestamp") )
              mysqlSqlBuilder.append(" varchar(36)")


            mysqlSqlBuilder.append(" DEFAULT NULL," +"\n")
          }
          mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,

          mysqlSqlBuilder.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
          System.out.println(mysqlSqlBuilder.toString)

          Class.forName("com.mysql.cj.jdbc.Driver")
          var con = DriverManager.getConnection("jdbc:mysql://" + database.host + ":" + database.port + "/" + database.db_name + "?serverTimezone=UTC", database.user, database.passwd)
          var st = con.createStatement
          st.execute(mysqlSqlBuilder.toString)
          st.close();con.close();

        }

        @throws[IOException]
        @throws[InterruptedException]
        def buildExecuteHiveSql(table:String,tableColumn:ArrayBuffer[String],lifecycle:String,owner:String): Unit = {
          val mysqlSqlBuilder = new StringBuilder
          mysqlSqlBuilder.append("CREATE TABLE " + table+ " ( \n")
          println("tableColumnCopy"+tableColumn.size)
          val tableColumnCopy = tableColumn.toArray[String];

          for (i <- 0 until tableColumnCopy.size) {
            val fieldAndType = tableColumnCopy.apply(i).split("\\|")

            if(fieldAndType.apply(1).contains("integer") || fieldAndType.apply(1).contains("long"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" bigint,")
            else if(fieldAndType.apply(1).contains("float") || fieldAndType.apply(1).contains("double") || fieldAndType.apply(1).contains("decimal"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" "+"double ,")
            else if(fieldAndType.apply(1).contains("string"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
            else if(fieldAndType.apply(1).contains("boolean"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" boolean,")
            else if(fieldAndType.apply(1).contains("date")||fieldAndType.apply(1).contains("timestamp"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
            mysqlSqlBuilder.append("\n")
          }
          mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,

          mysqlSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n")
          mysqlSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n")
          mysqlSqlBuilder.append("TBLPROPERTIES ('lifecycle'='" + lifecycle + "','owner'='" + owner + "','parquet.compression'='snappy');")
          System.out.println(mysqlSqlBuilder.toString)
          val process = new ProcessBuilder("hive", "-e", "\"" + mysqlSqlBuilder.toString + "\"").redirectErrorStream(true).start

          val br = new BufferedReader(new InputStreamReader(process.getInputStream))
          var line = ""
          do {
            line = br.readLine()
            Thread.sleep(1000)
            println(line)

          }while(line!=null)
          process.waitFor
        }
        def main(args: Array[String]): Unit = {
      //        val sparkconf = new SparkConf().setAppName("test_Spark_sql").setMaster("local[2]")
      //        val spark = SparkSession.builder().config(sparkconf).config("spark.driver.host", "localhost").getOrCreate()

          val spark= SparkSession.builder.appName("HiveToMysql").enableHiveSupport().getOrCreate()
          readDbPropertiesFile("HiveToMysql.properties",spark,args(0))

        }
      }
3.脚本文件:HiveToMysql.sh
                #!/bin/bash

        mv bigData.jar .
        mv HiveToMysql.properties .

        sql=`cat  /sql`

        spark-submit \
                --class HiveToMysql \
                --master yarn \
                --deploy-mode client \
                --num-executors 1 \
                --executor-memory 4g \
                --executor-cores 1 \
                --driver-memory 1g \
                --name "HiveToMysql" \
                --conf spark.speculation=true \
                --conf spark.speculation.interval=30000 \
                --conf spark.speculation.quantile=0.8 \
                --conf spark.speculation.multiplier=1.5 \
                --conf spark.dynamicAllocation.enabled=false \
                --files HiveToMysql.properties \
                --jars fastjson-1.2.62.jar,mysql-connector-java-8.0.18.jar \
                bigData.jar "$sql"
4.可能的问题
        1.scala比较烂,代码比较难阅读
        2.调度的时间一样(可做需改)
        3.数据类型的处理,根据业务需求

标签:airflow,String,val,scala,mysqlSqlBuilder,sparksql,apply,fieldAndType,append
来源: https://www.cnblogs.com/wuxiaolong4/p/16462320.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有