ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

数据如何从HBase读到MR

2021-04-12 09:04:49  阅读:153  来源: 互联网

标签:now 读到 LOG value trr MR HBase null lastSuccessfulRow


TableMapReduceUtil.initTableMapperJob是用来对内输入的,传递的参数之一,就是输入格式化类TableInputFormat.class,且会进行set操作:

job.setInputFormatClass(inputFormatClass);

TableInputFormat的父类TableInputFormatBase会创建TableRecordReader:

if (trr == null) {
  trr = new TableRecordReader();
}
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setHTable(table);
return trr;

所以记录读取器用的就是TableRecordReader。它会不断往下读:

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return this.recordReaderImpl.nextKeyValue();
}

查看实现类,说明K-V就是rowkey-Result对象:

public boolean nextKeyValue() throws IOException, InterruptedException {
  if (key == null) key = new ImmutableBytesWritable();
  //这里说明value就是Result对象
  if (value == null) value = new Result();
  try {
    try {
      //读取下一行内容
      value = this.scanner.next();
      if (logScannerActivity) {
        rowcount ++;
        if (rowcount >= logPerRowCount) {
          long now = System.currentTimeMillis();
          LOG.info("Mapper took " + (now-timestamp)
            + "ms to process " + rowcount + " rows");
          timestamp = now;
          rowcount = 0;
        }
      }
    } catch (IOException e) {
      // do not retry if the exception tells us not to do so
      if (e instanceof DoNotRetryIOException) {
        throw e;
      }
      // try to handle all other IOExceptions by restarting
      // the scanner, if the second call fails, it will be rethrown
      LOG.info("recovered from " + StringUtils.stringifyException(e));
      if (lastSuccessfulRow == null) {
        LOG.warn("We are restarting the first next() invocation," +
            " if your mapper has restarted a few other times like this" +
            " then you should consider killing this job and investigate" +
            " why it's taking so long.");
      }
      if (lastSuccessfulRow == null) {
        restart(scan.getStartRow());
      } else {
        restart(lastSuccessfulRow);
        scanner.next();    // skip presumed already mapped row
      }
      value = scanner.next();
      numRestarts++;
    }
    if (value != null && value.size() > 0) {
      //这里getRow得到的就是RowKey
      key.set(value.getRow());
      lastSuccessfulRow = key.get();
      return true;
    }

    updateCounters();
    return false;
  } catch (IOException ioe) {
    if (logScannerActivity) {
      long now = System.currentTimeMillis();
      LOG.info("Mapper took " + (now-timestamp)
        + "ms to process " + rowcount + " rows");
      LOG.info(ioe);
      String lastRow = lastSuccessfulRow == null ?
        "null" : Bytes.toStringBinary(lastSuccessfulRow);
      LOG.info("lastSuccessfulRow=" + lastRow);
    }
    throw ioe;
  }
}

nextKeyValue()方法用来不断往下读,value就是Result对象。往下读取,调用的是this.scanner.next()。这里的K set的就是得到的Row Key。说明数据会以Row Key作为K,以Result对象作为V。

当我们在Mapper中进行如下操作,就能拿到单元格中的信息:

Cell cell = value.getColumnLatestCell("cf".getBytes(), "line".getBytes());

标签:now,读到,LOG,value,trr,MR,HBase,null,lastSuccessfulRow
来源: https://blog.csdn.net/qq_36299025/article/details/115611425

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有