ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

实时数据湖-Merge On Read

2021-01-08 10:34:10  阅读:987  来源: 互联网

标签:Hudi record Read 实时 next Merge base key arrayWritable


Hudi

按照我的理解,我们一般所说的 MOR 与 Hudi 中的 MOR 不同,我们强调的是 query,而 Hudi 中指的是 table type。Hudi 中真正对应的我们的是视图(query type) 中的近实时视图(Snapshot Queries):

在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。

当前支持 Hudi 近实时视图的查询引擎如下图
在这里插入图片描述

Hudi 实时视图相关逻辑

实现类:HoodieRealtimeRecordReader -> RealtimeCompactedRecordReader

我们可以从 testReaderWithNestedAndComplexSchema() 这个测试方法中看到完整的 Snapshot Queries 的过程:

  1. 构建 HoodieRealtimeFileSplit,其中包含了 baseFilePath 、logFilePath、
    hadoop-conf
  2. 构建 HoodieRealtimeRecordReader,参数是上一步初始化好的 split,初始化过程中 scan() 方法会将
    split 中的 logFile 读取到 Map 中,key 为 _hoodie_record_key
  3. 在 HoodieRealtimeRecordReader.RealtimeCompactedRecordReader 的 next
    方法中会对 base 和 delta 进行 merge, 根据 _hoodie_record_key 使用 delta 数据替换
    base 中的数据

以下是核心方法 RealtimeCompactedRecordReader.next

// next 方法本身遍历的是 parquet 文件,即每条 base 数据读出来后,都去 deltaRecordMap 中 contains 下,这里的 HOODIE_RECORD_KEY 就发挥重要作用了,看看有没有对应的 delta 数据
  @Override
  public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
    // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
    // with a new block of values
    // 此处的 parquetReader,为之前构造 HoodieRealtimeRecordReader 中传入的 ParquetRecordReader
    boolean result = this.parquetReader.next(aVoid, arrayWritable);
    if (!result) {
      // if the result is false, then there are no more records
      return false;
    }
    if (!deltaRecordMap.isEmpty()) {
      //  Right now, we assume all records in log, have a matching base record. (which
      // would be true until we have a way to index logs too)
      // return from delta records map if we have some match.
      String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
      if (deltaRecordMap.containsKey(key)) {
        // (NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
        // deltaRecord may not be a full record and needs values of columns from the parquet
        Option<GenericRecord> rec;
        if (usesCustomPayload) {
          rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
        } else {
          rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
        }
        if (!rec.isPresent()) {
          // If the record is not present, this is a delete record using an empty payload so skip this base record
          // and move to the next record
          return next(aVoid, arrayWritable);
        }
        GenericRecord recordToReturn = rec.get();
        if (usesCustomPayload) {
          // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
          // the writerSchema with only the projection fields
          recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
        }
        // we assume, a later safe record in the log, is newer than what we have in the map &
        // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
        // schema, we use writerSchema to create the arrayWritable from the latest generic record
        // 这里的 aWritable 我个人理解为 avroWritable
        ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
        // log 中的 delta 数据
        Writable[] replaceValue = aWritable.get();
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
              HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
        }
        // parquet 中的 base 数据
        Writable[] originalValue = arrayWritable.get();
        try {
          // Sometime originalValue.length > replaceValue.length.
          // This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
          // 复制并且覆盖 base 的 parquet 数据
          System.arraycopy(replaceValue, 0, originalValue, 0,
              Math.min(originalValue.length, replaceValue.length));
          arrayWritable.set(originalValue);
        } catch (RuntimeException re) {
          LOG.error("Got exception when doing array copy", re);
          LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
          LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
          String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
              + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
          throw new RuntimeException(errMsg, re);
        }
      }
    }
    return true;
  }

查询引擎重点介绍 Presto

Presto 集成 Hudi 的逻辑可以用一句话来概括:

使用 HoodieParquetRealtimeInputFormat 中的 HoodieRealtimeRecordReader 读取
HoodieRealtimeFileSplit

引出两点:

  1. 加载HoodieParquetRealtimeInputFormat。 因为Presto使用其原生的ParquetPageSource而不是InputFormat的记录读取器,Presto将只显示基本Parquet文件,而不显示来自Hudi日志文件的实时更新,后者是avro数据(本质上与普通的读优化Hudi查询相同)。所以需要在 Presto 中使用 Hudi 已经实现好的HoodieParquetRealtimeInputFormat,当前是使用注解方式实现的,即在 HoodieParquetRealtimeInputFormat 类上添加注解@UseRecordReaderFromInputFormat
  2. 重建 Hudi FileSplit。从 HiveSplit 的额外元数据重新创建 Hudi 切片,实现类
    HudiRealtimeSplitConverter

下面两张图片分别是 presto 和 hudi 构造 HoodieRealtimeRecordReader 的方法,可以看出是基本相同的

在这里插入图片描述
在这里插入图片描述
下图是发挥通知引擎作用的 Hudi 枚举
在这里插入图片描述
综上,Hudi 对于 MOR表 所支持的视图-近实时查询做了基础的工作,以注解方式对外提供 HoodieParquetRealtimeInputFormat 供外部的查询引擎进行集成,Presto 也正是通过了 @UseFileSplitsFromInputFormat 注解来加载 Hudi 的 RecordReader,通过修改了 HiveUtil 相关的少部分逻辑做到了支持 Hudi MOR 表的近实时查询。

Iceberg

经过测试,iceberg 写入的时候是没有所谓的 log(avro) 或者 名为 delta 的增量文件,Iceberg 对于每批数据都是直接写到 parquet 文件中去的,故现状是没有所谓的 MOR 需实现。若考虑后期 Iceberg 支持 Row-Level 的 Upsert 和 Delete ,那么可能就有实现 MOR 的必要,这取决于Iceberg 的 Row-Level Delete的 实现方式,目前社区这一工作尚未完成。
Iceberg Row-level Delete

Row-level update和delete通常有Copy-on-Write和Merge-on-Read两种方案。其中Copy-on-Write把生成新数据文件的压力集中于写入的时候,适合对读有较高要求的场景;而Merge-on-Read把合并最终结果的压力放在读取的时候,适合于快速写入的场景。
我们在内部已经实现了基于Copy-on-Write的方式。同时也将Iceberg作为Spark 3.0的V2 Data Source和multi-catalog,和Spark进行了集成,用户可以方便的通过Spark SQL进行update、delete和merge into等DML操作,以及建表删表等DDL操作。

标签:Hudi,record,Read,实时,next,Merge,base,key,arrayWritable
来源: https://blog.csdn.net/wuleidaren/article/details/112345927

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有