ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

hadoop 2.6 yarn Records实现分析

2021-02-18 15:33:54  阅读:179  来源: 互联网

标签:String hadoop private clazz Records static yarn new Class


 

转自: https://blog.csdn.net/houzhizhen/article/details/51372058

 

Records在Yarn中原代码中主要用于RPC通訊,如以下语句生成一个新应用程序的請求,GetNewApplicationRequest request =
        Records.newRecord(GetNewApplicationRequest.class);

Records⾥的代码非常简单,调用newRecord时调用factory.newRecordInstance()方法,源代码如下:

public class Records {
  // The default record factory
  private static final RecordFactory factory =
      RecordFactoryProvider.getRecordFactory(null);
 
  public static <T> T newRecord(Class<T> cls) {
    return factory.newRecordInstance(cls);
  }
}

 

fractory是RecordFactory类型的,是一个接口,只有一个方法,代码如下:

public interface RecordFactory {
  public <T> T newRecordInstance(Class<T> clazz);
}

 

这个factory是RecordFactoryProvider.getRecordFactory(null)来初始化的。RecordFactoryProvider的源代码如下:

public class RecordFactoryProvider {
  private static Configuration defaultConf;
  
  static {
    defaultConf = new Configuration();
  }
  
  private RecordFactoryProvider() {
  }
  
  public static RecordFactory getRecordFactory(Configuration conf) {
    if (conf == null) {
      //Assuming the default configuration has the correct factories set.
      //Users can specify a particular factory by providing a configuration.
      conf = defaultConf;
    }
    String recordFactoryClassName = conf.get(
        YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
        YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
    return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
  }
  
  private static Object getFactoryClassInstance(String factoryClassName) {
    try {
      Class<?> clazz = Class.forName(factoryClassName);
      Method method = clazz.getMethod("get", null);
      method.setAccessible(true);
      return method.invoke(null, null);
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException(e);
    } catch (NoSuchMethodException e) {
      throw new YarnRuntimeException(e);
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    }
  }
}

在方法⾥,由于传递的参数是null,所以conf = defaultConf,默认的DEFAULT_IPC_RECORD_FACTORY_CLASS 是    "org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl";调用getFactoryClassInstance("org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl")。

 

在getFactoryClassInstance方法⾥.,直接调用它的静态get方法,没有参数。RecordFactoryPBImpl的源代码如下: 1 public class RecordFactoryPBImpl implements Record 2

 3   private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
 4   private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
 5  
 6   private static final RecordFactoryPBImpl self = new RecordFactoryPBImpl();
 7   private Configuration localConf = new Configuration();
 8   private ConcurrentMap<Class<?>, Constructor<?>> cache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
 9  
10   private RecordFactoryPBImpl() {
11   }
12   
13   public static RecordFactory get() {
14     return self;
15   }
16   
17   @SuppressWarnings("unchecked")
18   @Override
19   ....
}

 

RecordFactoryPBImpl是一个单例模式,get方法返回静态的final 变量 self.

 

在newRecordInstance(Class<T> clazz)方法⾥,首先判断cache是否能直接得到此类的构造器Constructor,如果得到,就直接调用constructor.newInstance();方法初始化一个对象,然后把此对象返回;如果缓冲器⾥没有,先调用getPBImplClassName(clazz)得到PBImpl的类实现,如getPBImplClassName(clazz);传递的是GetNewApplicationRequest,所在包是org.apache.hadoop.yarn.api.protocolrecords,按照规則,实现类是GetNewApplicationRequestPBImpl,所在包是org.apache.hadoop.yarn.api.protocolrecords.impl.pb, getPBImplClassName()方法返回的内容是org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl。

接着调用localConf.getClassByName("org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl")来返回class对象。然后得到它的constructor对象,放到cache⾥。

public <T> T newRecordInstance(Class<T> clazz) {
    
    Constructor<?> constructor = cache.get(clazz);
    if (constructor == null) {
      Class<?> pbClazz = null;
      try {
        pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
      } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Failed to load class: ["
            + getPBImplClassName(clazz) + "]", e);
      }
      try {
        constructor = pbClazz.getConstructor();
        constructor.setAccessible(true);
        cache.putIfAbsent(clazz, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnRuntimeException("Could not find 0 argument constructor", e);
      }
    }
    try {
      Object retObject = constructor.newInstance();
      return (T)retObject;
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    } catch (InstantiationException e) {
      throw new YarnRuntimeException(e);
    }
  }
 
  private String getPBImplClassName(Class<?> clazz) {
    String srcPackagePart = getPackageName(clazz);
    String srcClassName = getClassName(clazz);
    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
    return destPackagePart + "." + destClassPart;
  }
  
  private String getClassName(Class<?> clazz) {
    String fqName = clazz.getName();
    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
  }
  
  private String getPackageName(Class<?> clazz) {
    return clazz.getPackage().getName();
  }

  

标签:String,hadoop,private,clazz,Records,static,yarn,new,Class
来源: https://www.cnblogs.com/acSzz/p/14411996.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有