ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

通过sparksql读取presto中的数据存到clickhouse

2022-01-08 16:01:15  阅读:243  来源: 互联网

标签:case String val presto sparksql org spark clickhouse


整体结构

在这里插入图片描述

Config

package com.fuwei.bigdata.profile.conf

import org.slf4j.LoggerFactory
import scopt.OptionParser


case class Config(
                 env:String = "",
                 username:String = "",
                 password:String = "",
                 url:String = "",
                 cluster:String = "",
                 startDate:String = "",
                 endDate:String = "",
                 proxyUser:String = "",
                 topK:Int = 25
                 )

object Config{

    private val logger = LoggerFactory.getLogger("Config")

    /**
     * 将args参数数据封装Config对象中
     */

    def parseConfig(obj:Object,args:Array[String]):Config = {
        //1、通过我们的类名获取到程序名
        val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")

        //2、获取到一个解析器,解析器解析参数
        val parser = new OptionParser[Config]("spark sql "+programName) {
            //2.1添加使用说明
            head(programName,"v1.0") //就相当于抬头
            //2.2给env属性赋值
            //这种效果就是-v或者--v ,后面的text()是说明的内容
            opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
            opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
            programName match {
                case "LabelGenerator" => {
                    logger.info("LabelGenerator")
                    opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
                    opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
                    opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
                    opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
                }
                case _ =>
            }
        }
        parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值
            case Some(conf) => conf
            case None => {
                logger.error("can not parse args")
                System.exit(-1)
                null
            }
        }
    }
}

LabelGenerator

package com.fuwei.bigdata.profile

import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

/**
 * 生成基础画像标签的类
 */
object LabelGenerator {

   private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)

        //1、解析参数
        val params: Config = Config.parseConfig(LabelGenerator, args)

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
        //val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
        import spark.implicits._

        //3、读取归属地数据
        val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
        df.createOrReplaceTempView("phone_info") //构建一个虚表

        //4、baseFeatrueSql
        val userSql =
            """
              |select
              |t1.distinct_id as uid,
              |t1.gender,
              |t1.age,
              |case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
              |case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
              |t2.model
              |from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
              |on t1.distinct_id = t2.uid
              |""".stripMargin


        val userDF: DataFrame = spark.sql(userSql)
        userDF.createOrReplaceTempView("user_info")

        //4.2baseFeatureSql
        val baseFeatureSql =
            """
              |select
              |t1.uid,
              |t1.gender,
              |t1.age,
              |t1.email_suffix,
              |t1.model,
              |concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
              |from user_info as t1 left join phone_info as t2
              |on
              |t2.phone = substring(t1.mobile,0,7)
              |""".stripMargin

        //4.3、建表
        spark.sql(
            """
              |create table if not exists dws_news.user_profile_base(
              |uid string,
              |gender string,
              |age string,
              |email_suffix string,
              |model string,
              |region string
              |)
              |stored as parquet
              |""".stripMargin)
        //4.4 查询生成df
        val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
        baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭

        //把查询的数据导入到数据表中(查询生成df数据到HDFS)
        baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")

        //5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符
        val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)

        //6、插入ClickHouse数据
         //6.1插入的sql
        val insertCHSql =
            s"""
               |insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
               |""".stripMargin

        logger.warn(insertCHSql)

        //6.2按分区插入数据到clickhouse的表
        baseFeaturedDF.foreachPartition(partition => {
            TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
        })
        baseFeaturedDF.unpersist()//关闭持久化
        //7、释放资源
        spark.stop()
        logger.info("job has success")

    }
}

ClickHouseUtils

package com.fuwei.bigdata.profile.utils

import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties

object ClickHouseUtils {

    /**
     * 连接clickhouse
     * @param username
     * @param password
     * @param url
     * @return
     */
    def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
        val properties = new ClickHouseProperties()
        properties.setUser(username)
        properties.setPassword(password)
        val dataSource = new ClickHouseDataSource(url, properties)
        dataSource
    }

    /**
     *把类型转化并返回为age String, gender String
     */
    def df2TypeName2CH(dfCol: String): Unit ={
        dfCol.split(",").map(line => {
            val fields: Array[String] = line.split(" ")
            val fName: String = fields(0)
            val fType: String = dfType2chType(fields(1)) //开始类型的转换
            fName +" "+ fType //最后结果变成为age String, gender String
        }).mkString(",")
    }
    /**
     * 将df的type转换成clickhouse的type
     *
     * @param fieldType
     * @return
     */
    def dfType2chType(fieldType: String):String = {
        fieldType.toLowerCase() match {
            case "string" => "String"
            case "integer" => "Int32"
            case "long" => "Int64"
            case "bigint" => "Int64"
            case "double" => "Float64"
            case "float" => "Float32"
            case "timestamp" => "Datetime"
            case _ => "String"
        }
    }
}

SparkUtils(这个连接以后可以通用)

package com.fuwei.bigdata.profile.utils

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object SparkUtils {
    private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)

    def getSparkSession(env:String,appName:String):SparkSession = {
        val conf = new SparkConf()
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .set("spark.sql.hive.metastore.version", "1.2.1")
            .set("spark.sql.cbo.enabled", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")

        env match {
            case "prod" => {
                conf.setAppName(appName+"_prod")

                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case "dev" => {
                conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case _ => {
                logger.error("not match env")
                System.exit(-1)
                null
            }
        }
    }

}

TableUtils

package com.fuwei.bigdata.profile.utils

import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}

import java.sql.PreparedStatement

/**
 * @author:lifuwei
 * @time:2022-01-07
 * @params:这个类主要是用于把在hive中读取的数据存储到clickhouse中
 */
object TableUtils {
    /**
     * 向clickhouse中插入数据
     * @param partition
     * @param insertCHSql
     * @param params
     */
    def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
        //1、获取到Clickhouse的数据源
        val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
        val connection: ClickHouseConnection = dataSource.getConnection
        val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据

        var batchCount = 0
        val batchSize = 2000
        var lastBatchTime = System.currentTimeMillis()
        //2、填充占位符对应的参数值
        partition.foreach(row => {
            var index = 1//设置值的索引下标
            row.schema.fields.foreach(field => {
                field.dataType match {
                    case StringType => ps.setString(index,row.getAs[String](field.name))
                    case LongType => ps.setLong(index,row.getAs[Long](field.name))
                    case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
                    case _ => logger.error(s"type is err,${field.dataType}")
                }
                index +=1
            })
            //3、添加到批
            ps.addBatch()
            batchCount += 1

            //4、控制批次大小
            var currentTime = System.currentTimeMillis()
            if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
                ps.executeBatch()//执行一批
                logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
                batchCount = 0
                lastBatchTime = currentTime
            }
        })

        //5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据
        ps.executeBatch()
        logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")

        //6、释放资源
        ps.close()
        connection.close()

    }

    private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)

    /**
     * 根据dataframe生成clickhouse中的表
     * @param baseFeaturedDF : dataframe
     * @param params : 数据值
     * @return 返回的dataframe各个的列的名称和占位符
     */

        /*
        * baseFeaturedDF的DF的schema
        * fieldName:uid,gender,age,region,model,email_suffix
        * fieldType:string,string,string,string,string,string
        *
        * 我们需要插入数据的形式是
        * insert user_profile_base into value(?,?,?,?,?,?)
        *
        * 所以我们需要在这里面获得三个东西,第一个就是参数,第二个即使参数类型,第三个就是插入的值
        * */

        val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" //创建的数据库
        val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表


    def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
            //schema就是获取表的所有元数据(包括以上三个)
            //foldLeft是折叠函数
            /*
            *  baseFeaturedDF.schema : 获取df的整体架构
            *  baseFeaturedDF.schema.fields :把整体架构封装带一个数组中
            *  baseFeaturedDF.schema.fields.foldLeft : 对这个数组进行折叠
            * ("","","") :这个表明是输入的初始值
            * */
            val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
                (z,f) => {
                    //我们要返回的数据类型是:(age,gender , age string, gender string, ?,?)
                    if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
                        //说明不是第一次拼接
                        (z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
                    }else{
                        (f.name,f.name+" "+ f.dataType.simpleString,"?")
                    }
                }
            )
            /*
            * 4、将spark的表达式转换为clickhouse的表达式
            * 在spark中的string,但是在clickhouse中是String
            * 最终得出来的结果是age String,gender String  ......
            * */
            val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)


            //5、获取到连接到ch的cluster
            val cluster:String = params.cluster

            //6、创建数据库
            val createCHDataBaseSql =
                s"""
                   |create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
                   |""".stripMargin

            //7、创建表
        /*
        * ENGINE = MergeTree():在clickhouse中需要使用引擎engine ,这里我们使用合并树引擎MergeTree()
        * */
        val createCHTableSql =
            s"""
               |create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
               |ENGINE = MergeTree()
               |ORDER BY(uid)
               |""".stripMargin

        //8、删除表的SQL
        val dropCHTableSql =
            s"""
               |drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
               |""".stripMargin

        //9、连接clickhouse
        val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)

        val connection: ClickHouseConnection = dataSource.getConnection

        logger.warn(createCHDataBaseSql)
        var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)//建库
        ps.execute()

        logger.warn(dropCHTableSql)
        ps =  connection.prepareStatement(dropCHTableSql) //删表
        ps.execute()

        logger.warn(createCHTableSql)
        ps = connection.prepareStatement(createCHTableSql)//建表
        ps.execute()

        ps.close()
        connection.close()
        logger.info("success!!!!!!!!!")
        (fileName,pl)
    }
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fuwei.bigdata</groupId>
    <artifactId>user-profile</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.12</scala.version>
        <play-json.version>2.3.9</play-json.version>
        <maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
        <scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
        <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
        <spark.version>2.4.5</spark.version>
        <scope.type>compile</scope.type>
        <json.version>1.2.3</json.version>
        <!--compile provided-->
    </properties>

    <dependencies>

        <!--json 包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_2.11</artifactId>
            <version>4.0.0-RC2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark-bundle_2.11</artifactId>
            <version>0.5.2-incubating</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
            <scope>${scope.type}</scope>
            <exclusions>
                <exclusion>
                    <groupId>javax.mail</groupId>
                    <artifactId>mail</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.eclipse.jetty.aggregate</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>

    </dependencies>

    <repositories>

        <repository>
            <id>alimaven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <updatePolicy>never</updatePolicy>
            </releases>
            <snapshots>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven-assembly-plugin.version}</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala-maven-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-archetype-plugin</artifactId>
                <version>2.2</version>
            </plugin>
        </plugins>
    </build>
</project>

测试

##1. 将core-site.xml\yarn-site.xml\hive-site.xml拷贝到工程resources目录下
##2. clean and package
##3. hive metastore服务必须开
##4. yarn/hdfs必须要开
##5. clickhouse/chproxy也要打开
##6. 编写提交jar包的spark脚本
${SPARK_HOME}/bin/spark-submit \
    --jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    --conf spark.executor.heartbeatInterval=120s \
    --conf spark.network.timeout=600s \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.yarn.submit.waitAppCompletion=false \
    --name log2hudi \
    --conf spark.task.cpus=1 \
    --conf spark.executor.cores=4 \
    --conf spark.sql.shuffle.partitions=50 \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 1G \
    --executor-memory 3G \
    --num-executors 1 \
    --class com.qf.bigdata.profile.LabelGenerator \
    /data/jar/user-profile.jar \
    -e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1

##7. 通过clickhouse-client去测试
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert

标签:case,String,val,presto,sparksql,org,spark,clickhouse
来源: https://blog.csdn.net/li1579026891/article/details/122381593

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有