ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

我眼中的Hudi----数据库之Hudi

2021-01-04 13:01:45  阅读:330  来源: 互联网

标签:OPT 文件 Hudi option val 数据库 ---- spark


数据湖
数据湖或者Hudi是由大数据厂商提出来的。
数据量越大,越需要不同种类的存储,但是并不是所有企业的数据都是适合存储在廉价的HDFS集群之上的。

Apache Hudi让用户可以在Hadoop兼容的基础上存储大量数据,同时它还提供了两种原语操作,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Updae/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时它还提供了写操作的事务保证,查询会处理最后一个提交的快照,并基于此给出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定时间点获取表中已Updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势。

原语:计算机进程的控制通常是由原语完成的。所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可中断。
表设计
在较高的层次上,用于写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上,生成代表Hudi表的一组文件。然后,用户可以使用诸如Apache Spark、Presto、Apache Hive之类的查询引擎,查询该表。Hudi表的三个主要组件:
1,有序的时间轴元数据,类似数据库事务日志
2,分层布局的数据文件,实际写入表中的数据
3,多种实现方式的索引,映射包含指定记录的数据集

时间轴
Hudi维护了一条包含在不同的即时时间(instant time)对数据集做的所有instant操作的timeline,从而提供表的即时视图,同时还支持按到达顺序进行数据检索。时间轴包含以下组件:

  • Instant action:在表上的操作类型
  • Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增
  • state:即时状态,任意操作都可以处于以下三种状态
    1,Requested:表示已经安排操作行为,但是尚未开始
    2,Inflight:表示正在执行当前操作
    3,Completed:表示已经完成操作
    时间轴包含的操作类型:
  • commits:原子的写入一张表的操作
  • cleans:后台消除了表中旧版本数据,即表中不在需要的数据
  • delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中
  • compaction:后台协调Hudi中的差异数据
  • rollback:回滚,删除在写入过程中的数据
  • savepoint:将某些文件标记"已经保存",以便清除数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点

数据文件
Hudi将表组织成DFS上基本路径下的文件夹结构中,如果表是分区的,则在基本路径下还会有其他的分区,这些分区是包含在该分区数据的文件夹,与Hive表非常类似。
在每个分区内,文件被组织成文件组,由文件ID唯一表示。其中,每个切片包含基本列文件(.parquet)和一组日志文件(.log.*)。Hudi采用了MVCC设计,压缩操作会将日志和基本文件合并以产生新的文件片,而清除操作则将未使用/较旧的文件片删除,以回收DFS上的空间。

索引
Hudi通过索引机制提供高效的upsert操作,该机制会将一个记录键+分区路径组合一致性的映射到一个文件ID。
Hudi当前提供了三种索引实现,来映射一个记录键到包含该记录的文件ID。这将使得我们无需扫描表中的每条记录,就可以显著提高Upsert速度。

表类型
1,Copy On Write表
COW表写数据,直接写入到basefile(*.parquet),而不写入到log文件。所以,COW表的文件片只包含basefile(一个parquet构成一个文件片)
对于Update:该文件ID的最新版本都将被重写一次,并对所有已经更改的记录使用新值
对于insert:记录首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小
仅使用列式存储,例如parquet,仅更新版本号,通过写入过程中执行同步合并来重写文件。
2,Merge On Read表
MOR表写数据时,记录首先会被快速写进日志文件,稍后会使用时间轴上的压缩操作将其与基本文件合并。根据查询是读取日志中的合并快照流,还是变更流,还是仅读取未合并的基础文件,MOR支持多种查询类型。
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件,压缩同步和异步生成新版本的文件。
压缩
压缩是一个instant操作,它将一组文件片作为输入,将每个文件切片中的所有日志文件与其basefile文件(parquet文件)合并,以生成新的压缩文件片,并写为时间轴上的一个commit。压缩操作仅适合用于读取合并MOR表类型。

清理
清理是一项基本的instant操作,其执行的目的是删除旧的文件片,并限制表占用的存储空间。清理会在每次写操作之后自动执行。

DFS访问优化
Hudi对表中存储的数据执行了几种密钥管理功能,它能管理在DFS上存储数据的文件大小,计数,以及回收存储空间等。

查询

  • 快照查询:查询操作将查询最新快照的表数据。如果是Merge On Read类型的表,它将动态合并最新文件版本的基本数据和增量数据用于查询。如果是Copy On Write类型的表,它直接查询parquet表,同时提供upsert/delete操作。
  • 增量查询,查询只能看到写入表的新数据
  • 优化读查询,查询将查看给定提交/压缩操作的最新快照

Hudi的特点
1,近实时摄取
将外部数据(例如事件日志,数据库,外部源)如何摄取到Hadoop Data Lake是一个众所周知的问题。在大多数Hadoop部署中,经常会以零碎的方式,使用多种摄取工具解决,这些数据对整个组织是最具有价值的。
对于RDBMS关系型的摄入,Hudi提供了更快的Upsert操作。例如,可以通过MySQL binLog的形式或者Sqoop导入到hdfs上对应的Hudi表中,这样操作比Sqoop批量合并job和复杂合并工作流更加快速高效。
对于NoSQL的数据库,这种可以存储十亿行的数据库,完全采用批量加载是不可行的,并且如果摄取数据要跟上通常较高的更新量,则需要更有效的方法。
即时对于像Kafka这样不可变数据库源,Hudi也会在HDFS上强制执行最小文件大小,从而通过整体解决Hadoop领域中小文件过多的问题,改善NameNode的运行状况。对于事件流尤为重要,因为事件流通常较大,并且如果管理不善,可能严重损害Hadoop集群。
在所有来源中,Hudi都增加了急需的功能,即通过提交概念将新数据原子推送给消费者,避免摄入数据失败。
2,近实时分析
Hadoop上交互式的SQL解决方案有Presto和Spark SQL。将数据的更新事件缩短至几分钟,Hudi可以提供多种高效的替代方案,并可以对存储在DFS中的多个大小表进行实时分析。
3,增量处理管道
4,DFS的数据分散
Hudi可以像Kafka一样,用于数据分散,将每个管道的数据输出到Hudi表中,然后将其递增尾部以获取新数据并写入到服务存储中。

HUDI增删改查部分
将集群配置文件复制到,项目resource源码包下,使得本地环境可以访问hadoop集群
如:core-site.xml,hdfs-site.xml,hive-site.xml,yarn-site.xml

Hudi写入数据到HDFS

  // 不带分区写入
  def insert(): Unit = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val insertData = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
    insertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // table name 设置
      .option(HoodieWriteConfig.TABLE_NAME, "test")
      .mode(SaveMode.Overwrite)
      // 写入路径设置
      .save("/tmp/hudi")
  }
    //带分区写入
  def insertWithPartition(): Unit = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
      .withColumn("uuid", col("uid"))
      .withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
    frame.write.format("org.apache.hudi")
      //设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      //设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      //设置表名
      .option("hoodie.table.name", "testTable")
      //设置分区
      .option("hoodie.datasource.write.partition.field", "hudipartition")
      .mode(SaveMode.Overwrite)
      .save("/tmp/hudiOne")
  }

查询Hudi

  //查询hudi
  def query() = {
    val basepath = "/tmp/hudi2"
    val spark = SparkSession.builder().appName("query insert")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]").getOrCreate()
    val tripsSnapshotDF = spark
      .read.format("org.apache.hudi")
      .load(basepath + "/*/*")
    tripsSnapshotDF.show()
  }
    //增量查询
  def incrementalQuery() = {
    val beginTime = 20201212130000l
    val spark = SparkSession.builder().appName("query insert")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]").getOrCreate()
    val frame: DataFrame = spark.read.format("org.apache.hudi")
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
      //设置开始查询的时间戳 不需要设置结束时间戳
      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
      .load("/tmp/hudiOne")
    frame.show()
    println(frame.count())
  }

修改Hudi上的数据

 //修改Hdfs上的Hudi数据,根据uid生成一份最新的修改数据
  def updateData() = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
      .withColumn("uuid", col("uid"))
      .withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
    frame.write.format("org.apache.hudi")
      //设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      //设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      //设置表名
      .option("hoodie.table.name", "testTable")
      //设置分区
      .option("hoodie.datasource.write.partition.field", "hudipartition")
      .mode(SaveMode.Append)
      .save("/tmp/hudiOne")
  }

Hudi集成hive
将编译好的hudi-adoop-mr包复制到hive lib下

  def hiveSync() = {
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val upsertData = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
    upsertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      // 分区列设置
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "age")
      // 设置要同步的hive库名
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "one")
      // 设置要同步的hive表名
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
      // 设置数据集注册并同步到hive
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      // 设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置要同步的分区列名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "age") //hive 表同步的分区列
      // 设置jdbc 连接同步
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://192.168.10.111:10000")
      // hudi表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
      // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save("/tmp/hudi");
  }

标签:OPT,文件,Hudi,option,val,数据库,----,spark
来源: https://blog.csdn.net/qq_43057549/article/details/112060541

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有