ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

一键同步mysql到数仓(airflow调度)

2022-07-09 23:35:52  阅读:212  来源: 互联网

标签:数仓 airflow String contains hive mysql test fieldAndType public


经常会接到产品的需求:同步***表到hive,做分析。(做多了感觉很烦,就写一个工具)

一:背景、功能、流程介绍

1.背景:
    1.数仓使用hive存储,datax导数据、airflow调度
    2.虽然数据产品同学对datax进行了封装,可以点点点完成mysql表的同步,但是过程太复杂了
        还需要自己手动建表,还不支持修改。就萌生了自己写一个工具的想法
2.功能
    就是通过mysql配置完成hive的一般建表,airflow调度任务的生成
3.流程
    1.配置mysql链接
    2.根据mysql数据类型,生成对应的hive表结构,建表
    3.生成airflow调度任务(读取mysql表,调用datax,修复分区)

二:代码

1.配置文件介绍:

MysqlToHive.properties

        jdbcalias:ptx_read    #mysql别名要和同步的数据库的别名保持一致
        table:be_product      #要同步的表名 
        hive_prefix:ods.ods_product_   ##生成hive表的前缀
        hive_suffix:_dd                ##增量表还是全量表
        owner=xiaolong.wu              ##airflow任务的owner
        lifecycle=180                  ##hive表的生命周期,数据数据产品删除数据

        airflowpath=/airflow/dags/ods/    ##生成airflow任务文件的路径
        s3path=s3://path                  ##datax写hive表需要的基本路径


        jdbc1alias : hive                 ##可以写多个mysql链接,不用一个来回改
        jdbc1host : 127.0.0.1
        jdbc1port : 3306
        jdbc1user : root
        jdbc1passwd : **
        jdbc1db_name : test

        jdbc2alias:read
        jdbc2host : 127.0.0.1
        jdbc2port : 3306
        jdbc2user : root
        jdbc2passwd :**
        jdbc2db_name :test
2.基本代码:

MysqlToHive.java

        import java.io.*;
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.ResultSet;
        import java.sql.Statement;
        import java.util.ArrayList;
        import java.util.List;
        import java.util.Properties;

        class Database {//mysql配置工具类,非重点
            private String alias;
            private String host;
            private int port;
            private String user;
            private String passwd;
            private String db_name;

            public String getAlias() {return alias;}
            public void setAlias(String alias) {this.alias = alias;}
            public String getHost() {return host;}
            public void setHost(String host) {this.host = host;}
            public int getPort() {return port;}
            public void setPort(int port) {this.port = port;}
            public String getUser() {return user;}
            public void setUser(String user) {this.user = user;}
            public String getPasswd() {return passwd;}
            public void setPasswd(String passwd) {this.passwd = passwd;}
            public String getDb_name() {return db_name;}
            public void setDb_name(String db_name) {this.db_name = db_name;}

            @Override
            public String toString() {
                return "Database{" +"alias='" + alias + '\'' +", host='" + host + '\'' +", port=" + port +
                        ", user='" + user + '\'' +", passwd='" + passwd + '\'' +", db_name='" + db_name + '\'' +'}';
            }
        }

        public class MysqlToHive {
            public static String jdbcalias;
            public static String table;
            public static String hive_prefix;
            public static String hive_suffix;
            public static String owner;
            public static String lifecycle;
            public static String airflowpath;
            public static String s3path;


            public static Database database = new Database();
            public static List<Database> databasesList = new ArrayList<Database>();
            public static List<String> mysqlTableColumn = new ArrayList<String>();

            public static void parseProperties(Properties pp){
                jdbcalias = pp.getProperty("jdbcalias");
                table = pp.getProperty("table");
                hive_prefix = pp.getProperty("hive_prefix");
                owner = pp.getProperty("owner");
                lifecycle = pp.getProperty("lifecycle");
                airflowpath = pp.getProperty("airflowpath");
                s3path = pp.getProperty("s3path");
                hive_suffix = pp.getProperty("hive_suffix");

                int dbindex = 1;//根据mysql链接的别名,找到对应的mysql配置
                while(pp.getProperty("jdbc"+dbindex+"alias")!= null){
                    Database databaseItem = new Database();
                    databaseItem.setDb_name(pp.getProperty("jdbc"+dbindex+"db_name"));
                    databaseItem.setHost(pp.getProperty("jdbc"+dbindex+"host"));
                    databaseItem.setAlias(pp.getProperty("jdbc"+dbindex+"alias"));
                    databaseItem.setPasswd(pp.getProperty("jdbc"+dbindex+"passwd"));
                    databaseItem.setPort(Integer.parseInt(pp.getProperty("jdbc"+dbindex+"port")));
                    databaseItem.setUser(pp.getProperty("jdbc"+dbindex+"user"));
                    System.out.println(databaseItem.toString());
                    databasesList.add(databaseItem);

                    if(databaseItem.getAlias().equals(jdbcalias)){
                        database =databasesList.get(dbindex-1);
                    }
                    dbindex++;
                }
            }
            //读取配置文件
            public static  void readDbPropertiesFile (String fileName){
                Properties pp = new Properties();
                try {
                    InputStream fps = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName);
                    pp.load(fps);
                    parseProperties(pp);
                    fps.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //链接mysql,拿到对应的表结构,为后续生成hive表做准备
            public static  void readTableFormatted () throws Exception {
                String sql="show full fields from " + table;

                Connection con=null;
                Statement st=null;
                ResultSet rs=null;
                Class.forName("com.mysql.cj.jdbc.Driver");
                System.out.println("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC");
                con= DriverManager.getConnection("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC", database.getUser(),database.getPasswd());
                st=con.createStatement();
                rs=st.executeQuery(sql);
                while(rs.next())
                {
        //            System.out.println(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment"));
                    mysqlTableColumn.add(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment"));
                }
            }
            public static  int getmysqlLength(String type) {
                return Integer.parseInt(type.substring(type.indexOf("(")+1,type.indexOf(")")));
            }
            //根据mysql类型,生成对应的hive类型,并建hive表
            public static  void buildExecuteHiveSql () throws IOException, InterruptedException {
                StringBuilder hiveSqlBuilder = new StringBuilder();

                hiveSqlBuilder.append("CREATE TABLE "+hive_prefix+table+hive_suffix+" ( \n");
                for (int i = 0; i < mysqlTableColumn.size(); i++) {
                    String []fieldAndType= mysqlTableColumn.get(i).split("\\|");

                    hiveSqlBuilder.append(fieldAndType[0] + " ");
                    if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){
                        hiveSqlBuilder.append("bigint");
                    }
                    else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){
                        hiveSqlBuilder.append("binary");
                    }
                    else if(fieldAndType[1].contains("date") ){
                        hiveSqlBuilder.append("date");
                    }
                    else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){
                        hiveSqlBuilder.append("double");
                    }
                    else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext")
                            || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){
                        hiveSqlBuilder.append("string");
                    }
                    String comment = "";
                    if(fieldAndType.length==3){comment = fieldAndType[2];};
                    hiveSqlBuilder.append(" comment '"+comment+ "' ,");
                    hiveSqlBuilder.append("\n");
                }
                hiveSqlBuilder.deleteCharAt(hiveSqlBuilder.length()-2); //去除最后的回车和,
                hiveSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n");
                hiveSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n");
                hiveSqlBuilder.append("TBLPROPERTIES ('lifecycle'='"+lifecycle+"','owner'='"+owner+"','parquet.compression'='snappy');");
                System.out.println(hiveSqlBuilder.toString());

                Process process = new ProcessBuilder("hive","-e","\""+hiveSqlBuilder.toString()+"\"").redirectErrorStream(true).start();;
                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
                String line;
                while ((line = br.readLine()) != null) {
                    System.out.println(line);
                }
                process.waitFor();
            }
            //拼接mysql查询语句,用于airflow调度中查询mysql的sql语句
            public static  void printAirflowJobGetSelectSql (StringBuilder mysqlSelectBuilder,StringBuilder hiveTypeBuilder){
                mysqlSelectBuilder.append("select ");
                for (int i = 0; i < mysqlTableColumn.size(); i++) {
                    String []fieldAndType= mysqlTableColumn.get(i).split("\\|");
                    mysqlSelectBuilder.append(fieldAndType[0] + ",");
                    hiveTypeBuilder.append("{\"name\":\""+fieldAndType[0]+"\",\"type\":\"");
                    if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){
                        hiveTypeBuilder.append("bigint");
                    }
                    else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){
                        hiveTypeBuilder.append("binary");
                    }
                    else if(fieldAndType[1].contains("date") ){
                        hiveTypeBuilder.append("date");
                    }
                    else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){
                        hiveTypeBuilder.append("double");
                    }
                    else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext")
                            || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){
                        hiveTypeBuilder.append("string");
                    }
                    hiveTypeBuilder.append("\"},");
                }
                hiveTypeBuilder.deleteCharAt(hiveTypeBuilder.length()-1);
                mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length()-1);
                mysqlSelectBuilder.append(" from " + table);
            }
            //在固定路径下生成airflow文件,就是生成调度
            //该部分涉及到公司封装的代码太多了,就不保留了
            public static  void printAirflowJob () throws FileNotFoundException {

                String db = hive_prefix.substring(0,hive_prefix.indexOf("."));
                String odsTableName = hive_prefix.substring(hive_prefix.indexOf(".")+1);
                if(new File(odsTableName).exists()){
                    System.out.println("folder exist,please delete the folder "+airflowpath+odsTableName+table+hive_suffix);
                }
                else{
                    StringBuilder mysqlSelectBuilder = new StringBuilder();
                    StringBuilder hiveTypeBuilder = new StringBuilder();
                    printAirflowJobGetSelectSql(mysqlSelectBuilder,hiveTypeBuilder);

                    File dir = new File(airflowpath+odsTableName+table+hive_suffix);
                    dir.mkdirs();
                    PrintWriter pw = new PrintWriter(airflowpath+odsTableName+table+hive_suffix+"/"+odsTableName+table+hive_suffix+"_dag.py");
                    pw.println("import airflow");

                    pw.println("job_name='"+hive_prefix+table+hive_suffix+"'");
                    pw.println("job_owner='"+owner+"'");
                    pw.println("default_job_retries=1");
                    pw.println("default_job_retry_delay=timedelta(minutes=5)");
                    pw.println("default_start_date=airflow.utils.dates.days_ago(1)");
                    pw.println("dag_schedule_interval='12 1 * * *'");
                    pw.println("");
                    pw.println("");
                    pw.println("hive_table_name = job_name");
                    pw.println("");
                    pw.println("default_args = {");
                    pw.println("    'owner': job_owner,");
                    pw.println("    'depends_on_past': False,");
                    pw.println("    'start_date': default_start_date,");
                    pw.println("    'email': default_email_to,");
                    pw.println("    'email_on_failure': False,");
                    pw.println("    'email_on_retry': False,");
                    pw.println("    'retries': default_job_retries,");
                    pw.println("    'retry_delay': default_job_retry_delay,");
                    pw.println("    'pool': default_pool,");
                    pw.println("    'priority_weight': 10");
                    pw.println("}");
                    pw.println("");

                }

            }
            public static void main(String[] args) throws Exception {
                readDbPropertiesFile("MysqlToHive.properties");
                readTableFormatted();
                buildExecuteHiveSql();
                printAirflowJob();
            }

        }

三:执行样例

1.mysql样例:
        CREATE TABLE mysql_column_test (
            bigint_test bigint(10),
            int_test int(10),
            smallint_test smallint(10),
            binary_test binary(20),
            varbinary_test varbinary(20),
            decimal_test decimal(30,5),
            double_test double(30,5),
            float_test float(30,5),
            char_test char(40),
            varchar_test varchar(40),
            mediumtext_test mediumtext,
            date_test date,
            datetime_test datetime,
            time_test time,
            timestamp_test timestamp DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

        insert into mysql_column_test(bigint_test,int_test,smallint_test,binary_test,varbinary_test,decimal_test,double_test,float_test,char_test,varchar_test,mediumtext_test,
 date_test,datetime_test,time_test) values(1,2,3,UNHEX('4'),UNHEX('5'),6.1,7.1,8.1,9.1,'10','11','2022-01-01','2020-09-14 23:18:17',CURRENT_TIMESTAMP);
2.代码执行:直接复制代码过去,新建文件,执行
        javac -cp mysql-connector-java-8.0.18.jar MysqlToHive.java
        java -classpath mysql-connector-java-8.0.18.jar: MysqlToHive

标签:数仓,airflow,String,contains,hive,mysql,test,fieldAndType,public
来源: https://www.cnblogs.com/wuxiaolong4/p/16462217.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有