ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

HUDI preCombinedField 总结(二)-源码分析

2022-02-22 16:05:37  阅读:223  来源: 互联网

标签:HUDI preCombinedField orderingVal return record 源码 key new public


前言

在上一篇博客HUDI preCombinedField 总结中已经对preCombinedField进行总结过一次了,由于当时对源码理解还不够深入,导致分析的不全面,现在对源码有了进一步的理解,所以再进行总结补充一下。

历史比较值

上面总结中:

DF:无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
SQL:写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。

这里解释一下原因,首先Spark SQL PAYLOAD_CLASS_NAME 默认值为ExpressionPayload,而ExpressionPayload继承了DefaultHoodieRecordPayload

class ExpressionPayload(record: GenericRecord,
                        orderingVal: Comparable[_])
  extends DefaultHoodieRecordPayload(record, orderingVal) {

DefaultHoodieRecordPayload 里的needUpdatingPersistedRecord实现了历史值进行比较,具体实现,后面会进行分析

而 Spark DF在hudi0.9.0版本 PAYLOAD_CLASS_NAME的默认值为OverwriteWithLatestAvroPayload,它是DefaultHoodieRecordPayload的父类并没有实现和历史值进行比较

历史值比较实现

对源码进行简单的分析,首先说明历史比较值的配置项为:

 HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field"

而它的默认值为ts,所以ordering_field和preCombineField并不一样,但是因为默认值一样而且实现都在PAYLOAD_CLASS里,所以给人的感觉是一样,故放在一起进行总结

HoodieMergeHandle

hudi 在 upsert进行小文件合并时,会走到HoodieMergeHandled的write方法:

/**
   * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
   */
  public void write(GenericRecord oldRecord) {
    // 历史key值  
    String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
    boolean copyOldRecord = true;
    if (keyToNewRecords.containsKey(key)) { //如果新记录的key值包含旧值,则进行合并逻辑
      // If we have duplicate records that we are updating, then the hoodie record will be deflated after
      // writing the first record. So make a copy of the record to be merged
      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
      try {
        // 这里调用了 PAYLOAD_CLASS 的 combineAndGetUpdateValue方法
        Option<IndexedRecord> combinedAvroRecord =
            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
              useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
                config.getPayloadConfig().getProps());

        if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
          // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
          copyOldRecord = true;
        } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
          /*
           * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
           * write the the combined new
           * value
           *
           * We no longer need to copy the old record over.
           */
          copyOldRecord = false;
        }
        writtenRecordKeys.add(key);
      } catch (Exception e) {
        throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
            + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
      }
    }

    if (copyOldRecord) {
      // this should work as it is, since this is an existing record
      try {
        fileWriter.writeAvro(key, oldRecord);
      } catch (IOException | RuntimeException e) {
        String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
                key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
        LOG.debug("Old record is " + oldRecord);
        throw new HoodieUpsertException(errMsg, e);
      }
      recordsWritten++;
    }
  }

combineAndGetUpdateValue方法

看一下 DefaultHoodieRecordPayload的combineAndGetUpdateValue:

 
  @Override
  /**
   * currentValue 当前值,即历史记录值
   * Option<IndexedRecord> combinedAvroRecord =
   *             hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
   *               useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
   *                 config.getPayloadConfig().getProps());
   */
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
    // recordBytes 为新数据的字节值  
    if (recordBytes.length == 0) {
      return Option.empty();
    }
    // 将recordBytes转化为Avro格式的GenericRecord
    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);

    // Null check is needed here to support schema evolution. The record in storage may be from old schema where
    // the new ordering column might not be present and hence returns null.
    // 如果不需要历史值,则返回历史记录值
    if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
      return Option.of(currentValue);
    }

    /*
     * We reached a point where the value is disk is older than the incoming record.
     */
    eventTime = updateEventTime(incomingRecord, properties);

    /*
     * Now check if the incoming record is a delete record.
     */
    return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
  }

关于recordBytes的赋值,在父类BaseAvroPayload,我们写数据时需要先构造GenericRecord record,然后将record作为参数传给PayLoad,最后构造构造List<HoodieRecord>,调用HoodieJavaWriteClient.upsert(List<HoodieRecord> records,
String instantTime)

  public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
    this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
    this.orderingVal = orderingVal;
    if (orderingVal == null) {
      throw new HoodieException("Ordering value is null for record: " + record);
    }
  }

needUpdatingPersistedRecord

和历史值的比较就在这里:

  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
                                                IndexedRecord incomingRecord, Properties properties) {
    /*
     * Combining strategy here returns currentValue on disk if incoming record is older.
     * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
     * or an insert/update record. In any case, if it is older than the record in disk, the currentValue
     * in disk is returned (to be rewritten with new commit time).
     *
     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path
     * and need to be dealt with separately.
     */
    // 历史ts值
    Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
        properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true);
    // 新数据的ts值    
    Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
        properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false);
    // 如果历史值为null或者历史值小于新值,则返回true,代表要覆盖历史值更新,反之不更新    
    return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
  }

PAYLOAD_ORDERING_FIELD_PROP_KEY默认值

可以看到在上面HoodieMergeHandle中传的properties参数为config.getPayloadConfig().getProps()
getPayloadConfig返回HoodiePayloadConfig,而在HoodiePayloadConfig定义了PAYLOAD_ORDERING_FIELD_PROP_KEY的默认值为ts

  public HoodiePayloadConfig getPayloadConfig() {
    return hoodiePayloadConfig;
  }

 public class HoodiePayloadConfig extends HoodieConfig {

  public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
      .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
      .defaultValue("ts")
      .withDocumentation("Table column/field name to order records that have the same key, before "
          + "merging and writing to storage."); 

预合并实现

首先说明,预合并实现方法为类 OverwriteWithLatestAvroPayload.preCombine

public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
    implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

  public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0); // natural order
  }

  @Override
  public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
    if (oldValue.recordBytes.length == 0) {
      // use natural order for delete record
      return this;
    }
    // 如果旧值的orderingVal大于orderingVal,发返回旧值,否则返回当前新值,即返回较大的record
    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
      // pick the payload with greatest ordering value
      return oldValue;
    } else {
      return this;
    }
  }

所以无论是Spark SQL 还是 Spark DF都默认实现了预合并ExpressionPayload、DefaultHoodieRecordPayload都继承了(extends)OverwriteWithLatestAvroPayload,所以用这三个payload都可以实现预合并,关键看怎么构造paylod

构造Paylod

根据上面的代码,我们可以发现OverwriteWithLatestAvroPayload有两个构造函数,一个参数和两个参数,其中一个参数的并不能实现预合并,因为预合并方法中需要orderingVal比较,所以要用两个参数的构造函数构造OverwriteWithLatestAvroPayload,其中orderingVal 为 preCombineField对应的值,record为一行记录值。而无论是Spark SQL还是Spark DF,最终都会调用HoodieSparkSqlWriter.write,构造paylod就是在这个write方法里实现的。

// Convert to RDD[HoodieRecord]
// 首先将df转为RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
  org.apache.hudi.common.util.Option.of(schema))
// 判断是否需要预合并  
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
  operation.equals(WriteOperationType.UPSERT) ||
  parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
    HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
  val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
  val hoodieRecord = if (shouldCombine) { // 如果需要预合并
    // 从record中取出PRECOMBINE_FIELD对应的值,如果值不存在,则抛出异常,因为预合并的字段不允许存在空值
    val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
      .asInstanceOf[Comparable[_]]
    然后通过反射的方法,构造PAYLOAD_CLASS_NAME对应的paylod  
    DataSourceUtils.createHoodieRecord(processedRecord,
      orderingVal, keyGenerator.getKey(gr),
      hoodieConfig.getString(PAYLOAD_CLASS_NAME))
  } else {
    // 如果不需要预合并,也通过反射构造paylod,但是不需要orderingVal参数  
    DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
  }
  hoodieRecord
}).toJavaRDD()

通过上面源码的注释中可以看到,如果需要进行预合并的话,则首先取出record中对应的PRECOMBINE_FIELD值orderingVal,然后构造payload,即

new OverwriteWithLatestAvroPayload(record, orderingVal)

这里就构造好了payload,那么最终是在哪里实现的预合并呢?

调用preCombine

这里以cow表的upsert为例,即HoodieJavaCopyOnWriteTable.upsert

// HoodieJavaCopyOnWriteTable
  @Override
  public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
                                                       String instantTime,
                                                       List<HoodieRecord<T>> records) {
    return new JavaUpsertCommitActionExecutor<>(context, config,
        this, instantTime, records).execute();
  }

// JavaUpsertCommitActionExecutor
   @Override
  public HoodieWriteMetadata<List<WriteStatus>> execute() {
    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
        config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
  }

// AbstractWriteHelper
public HoodieWriteMetadata<O> write(String instantTime,
                                      I inputRecords,
                                      HoodieEngineContext context,
                                      HoodieTable<T, I, K, O> table,
                                      boolean shouldCombine,
                                      int shuffleParallelism,
                                      BaseCommitActionExecutor<T, I, K, O, R> executor,
                                      boolean performTagging) {
    try {
      // De-dupe/merge if needed
      I dedupedRecords =
          combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

      Instant lookupBegin = Instant.now();
      I taggedRecords = dedupedRecords;
      if (performTagging) {
        // perform index loop up to get existing location of records
        taggedRecords = tag(dedupedRecords, context, table);
      }
      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

      HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
      result.setIndexLookupDuration(indexLookupDuration);
      return result;
    } catch (Throwable e) {
      if (e instanceof HoodieUpsertException) {
        throw (HoodieUpsertException) e;
      }
      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
    }
  }

    public I combineOnCondition(
      boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
    return condition ? deduplicateRecords(records, table, parallelism) : records;
  }


  /**
   * Deduplicate Hoodie records, using the given deduplication function.
   *
   * @param records     hoodieRecords to deduplicate
   * @param parallelism parallelism or partitions to be used while reducing/deduplicating
   * @return Collection of HoodieRecord already be deduplicated
   */
  public I deduplicateRecords(
      I records, HoodieTable<T, I, K, O> table, int parallelism) {
    return deduplicateRecords(records, table.getIndex(), parallelism);
  }


// SparkWriteHelper  
  @Override
  public JavaRDD<HoodieRecord<T>> deduplicateRecords(
      JavaRDD<HoodieRecord<T>> records, HoodieIndex<T, ?, ?, ?> index, int parallelism) {
    boolean isIndexingGlobal = index.isGlobal();
    return records.mapToPair(record -> {
      HoodieKey hoodieKey = record.getKey();
      // If index used is global, then records are expected to differ in their partitionPath
      // 获取record的key值
      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
      // 返回 (key,record) 
      return new Tuple2<>(key, record);
    }).reduceByKey((rec1, rec2) -> {
      @SuppressWarnings("unchecked")
      // key值相同的record 通过 preCombine函数,返回 preCombineField值较大那个
      T reducedData = (T) rec2.getData().preCombine(rec1.getData());
      HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

      return new HoodieRecord<T>(reducedKey, reducedData);
    }, parallelism).map(Tuple2::_2);
  }     

这样就实现了预合并的功能

修改历史比较值

最后说一下历史比较值是怎么修改的,其实Spark SQL 和 Spark DF不用特意修改它的值,因为默认和preCombineField值是同步修改的,看一下程序怎么同步修改的。
无论是是SQL还是DF最终都会调用HoodieSparkSqlWriter.write

// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
  null, path, tblName,
  mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
  .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

  public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
                                                       String tblName, Map<String, String> parameters) {
    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
  }

  public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath,
      String tblName, Map<String, String> parameters) {
    boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
    boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
        .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
    boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
    boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key()));
    // insert/bulk-insert combining to be true, if filtering for duplicates
    boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
        .withPath(basePath).withAutoCommit(false).combineInput(combineInserts, true);
    if (schemaStr != null) {
      builder = builder.withSchema(schemaStr);
    }

    return builder.forTable(tblName)
        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
            .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
            .withInlineCompaction(inlineCompact).build())
        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
            .withInlineClustering(inlineClusteringEnabled)
            .withAsyncClustering(asyncClusteringEnabled).build())
         // 在这里设置里OrderingField 的值等于 PRECOMBINE_FIELD,所以默认和PRECOMBINE_FIELD是同步修改的
        .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
            .build())
        // override above with Hoodie configs specified as options.
        .withProps(parameters).build();
  }  

如果确实想修改默认值,即和PRECOMBINE_FIELD不一样,

那么sql:

set hoodie.payload.ordering.field=ts;

DF:

.option("hoodie.payload.ordering.field", "ts")
或
.option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts")

标签:HUDI,preCombinedField,orderingVal,return,record,源码,key,new,public
来源: https://blog.csdn.net/dkl12/article/details/123070348

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有