ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021SC@SDUSC Hbase(十四)项目代码分析-HFile写入Cell

2021-12-25 13:02:01  阅读:171  来源: 互联网

标签:cell Writer 2021SC writer Cell IOException SDUSC HFile


2021SC@SDUSC

一、前言

        本文我们来简单介绍下HFile写入Cell的主体流程

二、浅析

        HFile文件Cell写入的发起位置,一个就是Memstore flush时,StoreFlusher的preformFlush()方法:

  /**
   * Performs memstore flush, writing data from scanner into sink.
   * 
   * @param scanner Scanner to get data from.
   * @param sink Sink to write data to. Could be StoreFile.Writer.
   * @param smallestReadPoint Smallest read point used for the flush.
   */
  protected void performFlush(InternalScanner scanner,
      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
    int compactionKVMax =
      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
    List<Cell> kvs = new ArrayList<Cell>();
    boolean hasMore;
    do {
      hasMore = scanner.next(kvs, compactionKVMax);
      if (!kvs.isEmpty()) {
        for (Cell c : kvs) {
          // If we know that this KV is going to be included always, then let us
          // set its memstoreTS to 0. This will help us save space when writing to
          // disk.

          sink.append(c);
        }
        kvs.clear();
      }
    } while (hasMore);
  }

        它会循环地将Cell写入Compactor.CellSink类型的sink。在performFlush()的调用者DefaultStoreFlusher的flushSnapshot()中,首先会调用HStore的createWriterInTmp()生成一个StoreFile.Writer的实例writer,然后将这个writer作为参数sink传入performFlush():

        // Write the map out to the disk
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }

那么StoreFile.Writer类型的writer是如何生成的呢?进入到HStore的createWriterInTmp()方法中:


    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())
            .withFilePath(fs.createTempName())
            .withComparator(comparator)
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)
            .withFavoredNodes(favoredNodes)
            .withFileContext(hFileContext)
            .build();

在StoreFile.WriterBuilder的build()方法最后,会new一个StoreFile.Writer实例,在其构造方法中,会生成一个HFile.Writer实例writer:

      writer = HFile.getWriterFactory(conf, cacheConf)
          .withPath(fs, path)
          .withComparator(comparator)
          .withFavoredNodes(favoredNodes)
          .withFileContext(fileContext)
          .create();

而这个HFile.Writer实例writer则是通过HFile中WriterFactory获取的:

  /**
   * Returns the factory to be used to create {@link HFile} writers
   */
  public static final WriterFactory getWriterFactory(Configuration conf,
      CacheConfig cacheConf) {
    int version = getFormatVersion(conf);
    switch (version) {
    case 2:
      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
    case 3:
      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
    default:
      throw new IllegalArgumentException("Cannot create writer for HFile " +
          "format version " + version);
    }
  }

HBase中包含两种类型的WriterFactoryV2:HFileWriterV2.WriterFactoryV2和HFileWriterV3 .WriterFactoryV3,我们看一下HFileWriterV2.WriterFactoryV2,它create的HFile.Writer实例其实为HFileWriterV2:

    @Override
    public Writer createWriter(FileSystem fs, Path path, 
        FSDataOutputStream ostream,
        KVComparator comparator, HFileContext context) throws IOException {
      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
          comparator, context);
      }
    }

所以最开始的sink调用append循环写入Cell,实际上最终调用的是HFileWriterV2的append()方法:

    public void append(final Cell cell) throws IOException {
      appendGeneralBloomfilter(cell);
      appendDeleteFamilyBloomFilter(cell);
      writer.append(cell);
      trackTimestamps(cell);
    }

这个writer的实例化就是我们上面讲到的HFileWriterV2的实例化。接下来,我们重点地看下HFileWriterV2的append()方法:

  /**
   * Add key/value to file. Keys must be added in an order that agrees with the
   * Comparator passed on construction.
   *
   * @param cell Cell to add. Cannot be empty nor null.
   * @throws IOException
   */
  @Override
  public void append(final Cell cell) throws IOException {
    
	byte[] value = cell.getValueArray();
    int voffset = cell.getValueOffset();
    int vlength = cell.getValueLength();
    
    // checkKey uses comparator to check we are writing in order.
    boolean dupKey = checkKey(cell);
    checkValue(value, voffset, vlength);
    if (!dupKey) {
      checkBlockBoundary();
    }
    if (!fsBlockWriter.isWriting()) {
      newBlock();
    }
    fsBlockWriter.write(cell);
    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
    totalValueLength += vlength;
    // Are we the first key in this block?
    if (firstCellInBlock == null) {
      // If cell is big, block will be closed and this firstCellInBlock reference will only last
      // a short while.
      firstCellInBlock = cell;
    }
 
    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
    // indefinetly?
    lastCell = cell;
    
    entryCount++;
    
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
  }

逻辑大体如下:

  1. 从Cell中获取value、voffset和vlength
  2. 调用checkKey()方法,检测给定的Cell,确保key有序,dupKey为true说明key未更改,反之则是已更改
  3. 调用checkValue(),检测value,确保其不为空
  4. 换key时才检测blokc边界,调用checkBlockBoundary()方法
  5. 若需申请新的数据块,调用newBlock()方法申请,判断依据是fsBlockWriter的isWriting()返回为false
  6. 调用fsBlockWriter的write()方法写入Cell
  7. 累加key长度totalKeyLength和Value长度totalValueLength
  8. 如果需要的话,标记该数据块中写入的第一个key
  9. 标记上次写入Cell
  10. entryCount+1
  11. 更新maxMemstoreTS

检测block边界的checkBlockBoundary()方法代码如下:

  /**
   * At a block boundary, write all the inline blocks and opens new block.
   *
   * @throws IOException
   */
  protected void checkBlockBoundary() throws IOException {
	if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
      return;
    finishBlock();
    writeInlineBlocks(false);
    newBlock();
  }

它会判断fsBlockWriter已写入大小,如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,则直接返回,否则说明block已达到或超过阈值,需进行以下操作:

  1. 调用finishBlock()方法,结束上一个block
  2. 调用writeInlineBlocks()方法,写入InlineBlocks
  3. 调用newBlock()方法,申请新块

以上,如有错误,欢迎指正。

标签:cell,Writer,2021SC,writer,Cell,IOException,SDUSC,HFile
来源: https://blog.csdn.net/m0_53145078/article/details/122136929

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有