ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

motan系列4——服务调用

2021-12-28 20:33:15  阅读:175  来源: 互联网

标签:调用 系列 getName request private new motan method


1、服务调用方式

  调用motan服务,可以在setter方法或field标注 @MotanReferer 注解引入要调用的服务接口,如下作用于field:

@MotanReferer(basicReferer = "ad-commonBasicRefererConfigBean", application = "ad-filter", version = "1.1.0")
private AdCommonRPC adCommonRPC;

2、@MotanReferer 的解析

  被 @MotanReferer 标注的 setter 方法或 field 会被motan在启动时扫描,并为其创建动态代理,并将动态代理的实例赋值给这个 field。远程服务的调用都是在这个代理中实现的。

  在前面介绍服务注册时,说到了对于 @MotanService 的解析是在 AnnotationBean 中做的,同样对于 @MotanReferer 的解析也是在这里面。AnnotationBean 实现 BeanPostProcessor ,在后置处理方法中处理 @MotanService,在前置处理方法中处理 @MotanReferer。

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (!isMatchPackage(bean)) {
            return bean;
        }
        Class<?> clazz = bean.getClass();
        if (isProxyBean(bean)) {
            clazz = AopUtils.getTargetClass(bean);
        }
        Method[] methods = clazz.getMethods();
        for (Method method : methods) {
            String name = method.getName();
            if (name.length() > 3 && name.startsWith("set")
                    && method.getParameterTypes().length == 1
                    && Modifier.isPublic(method.getModifiers())
                    && !Modifier.isStatic(method.getModifiers())) {
                try {
                    MotanReferer reference = method.getAnnotation(MotanReferer.class);
                    if (reference != null) {
                        Object value = refer(reference, method.getParameterTypes()[0]);
                        if (value != null) {
                            method.invoke(bean, new Object[]{value});
                        }
                    }
                } catch (Exception e) {
                    throw new BeanInitializationException("Failed to init remote service reference at method " + name
                            + " in class " + bean.getClass().getName(), e);
                }
            }
        }


        Field[] fields = clazz.getDeclaredFields();
        for (Field field : fields) {
            try {
                if (!field.isAccessible()) {
                    field.setAccessible(true);
                }
                MotanReferer reference = field.getAnnotation(MotanReferer.class);
                if (reference != null) {
                    //调用 refer 方法初始化并创建动态代理,field 指向代理对象
                    Object value = refer(reference, field.getType());
                    if (value != null) {
                        field.set(bean, value);
                    }
                }
            } catch (Exception e) {
                throw new BeanInitializationException("Failed to init remote service reference at filed " + field.getName()
                        + " in class " + bean.getClass().getName(), e);
            }
        }
        return bean;
    }

  类似于服务注册的过程,MotanReferer 是通过 RefererConfigBean 类来管理配置、注册中心、URL、HA、LoadBalance、Proxy等资源的。

  先来看下  RefererConfigBean 的继承结构。   

                                         

  其中,注册中心、URL、Protocol、HA、LoadBalance策略等都是在 RefererConfig 的 clusterSupports 中管理的,ClusterSupport 是处理订阅服务的,后面再具体看。

  来继续看这个refer方法,这个方法中首先将 @MotanReferer 注解中的配置信息解析到 RefererConfigBean 中,然后依然是调用 afterPropertiesSet() 方法做一些校验,最后调用 RefererConfigBean 的 getRef() 方法,各个组件的初始化以及Proxy都在这里创建。 

private <T> Object refer(MotanReferer reference, Class<?> referenceClass) {
        
        RefererConfigBean<T> referenceConfig = referenceConfigs.get(key);
        if (referenceConfig == null) {
            referenceConfig = new RefererConfigBean<T>();

              /**
                * 缺省 ...,这里就是配置的封装
               **/

                try {
                    //类似服务注册,也是在这一步进行配置的校验
                    referenceConfig.afterPropertiesSet();
                } catch (RuntimeException e) {
                    throw (RuntimeException) e;
                } catch (Exception e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
            }
            referenceConfigs.putIfAbsent(key, referenceConfig);
            referenceConfig = referenceConfigs.get(key);
        }
        //这一步是核心方法,初始化和创建代理对象
        return referenceConfig.getRef();
    }

3、Proxy的创建  

  上面将配置封装到 RefererConfigBean 之后,通过 getRef() 方法中实际调用 initRef() 方法来创建Proxy的。

public synchronized void initRef() {
    // ... 校验 interface 和 protocols 是否非空
    checkInterfaceAndMethods(interfaceClass, methods);

    clusterSupports = new ArrayList<>(protocols.size());
    List<Cluster<T>> clusters = new ArrayList<>(protocols.size());
    String proxy = null;

    ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

    // 解析注册中心地址
    List<URL> registryUrls = loadRegistryUrls();
    // 解析本机IP
    String localIp = getLocalHostAddress(registryUrls);
    for (ProtocolConfig protocol : protocols) {
        Map<String, String> params = new HashMap<>();
        params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
        params.put(URLParamType.version.getName(), URLParamType.version.getValue());
        params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));

        collectConfigParams(params, protocol, basicReferer, extConfig, this);
        collectMethodConfigParams(params, this.getMethods());

        String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
        URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);
        // 初始化ClusterSupport
        ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);

        clusterSupports.add(clusterSupport);
        clusters.add(clusterSupport.getCluster());
         
        if (proxy == null) {
            // 获取创建proxy的方式,默认是JDK动态代理
            String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
            proxy = refUrl.getParameter(URLParamType.proxy.getName(), defaultValue);
        }
    }
    // 创建代理
    ref = configHandler.refer(interfaceClass, clusters, proxy);

    initialized.set(true);
}

  先来看下 ClusterSupport,这个类封装了下面这些信息,用于对集群特性的支持,这个下一节分析:

    private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
    private Cluster<T> cluster;
    private List<URL> registryUrls;
    private URL url;
    private Class<T> interfaceClass;
    private Protocol protocol;
    private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();

  其中 Cluster 默认使用的是 ClusterSpi,可以看到默认加载就是”default“就是 ClusterSpi(原理参考motan SPI机制)。里面包括高可用和负载均衡策略。

@SpiMeta(name = "default")
public class ClusterSpi<T> implements Cluster<T> {

    private HaStrategy<T> haStrategy;

    private LoadBalance<T> loadBalance;

    private List<Referer<T>> referers;

    private AtomicBoolean available = new AtomicBoolean(false);

    private URL url;

}

  我们继续看又调用了 configHandler.refer 方法,默认情况下,这个proxy参数的值是"jdk",即使用JDK自身的动态代理功能创建代理,ConfigHandler 也是一个扩展点,默认使用的是 SimpleConfigHandler。

@SpiMeta(name = "default")
public class SimpleConfigHandler implements ConfigHandler {

    @Override
    public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
        return proxyFactory.getProxy(interfaceClass, clusters);
    }

  代理工厂这里又是一个扩展点,默认使用的是JDK动态代理 JdkProxyFactory,因为 proxyType 默认是jdk。

@SpiMeta(name = "jdk")
public class JdkProxyFactory implements ProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clz, List<Cluster<T>> clusters) {
        return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
    }
}

  到这里,@MotanReferer 的解析,代理对象的创建就完成了,实际的调用是委托给 RefererInvocationHandler。

4、RPC调用

  RefererInvocationHandler 代理了目标接口,那么接口的每个方法调用都会走到这个代理类中。所以接下来主要关注代理是如何完成RPC调用的,主要看它的 invoke 方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 省略 local method 部分

    DefaultRequest request = new DefaultRequest();
    request.setRequestId(RequestIdGenerator.getRequestId());
    request.setArguments(args);
    String methodName = method.getName();
    boolean async = false; // 异步调用支持,暂不关注
    if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
        methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
        async = true;
    }
    request.setMethodName(methodName);
    request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
    request.setInterfaceName(interfaceName);

    return invokeRequest(request, getRealReturnType(async, this.clz, method, methodName), async);
}

  先将请求数据封装成 DefaultRequest,包含:interfaceName、methodName、arguments、retries、rpcProtocolVersion等信息。然后调用 invokeRequest() 方法:

Object invokeRequest(Request request, Class returnType, boolean async) throws Throwable {
    RpcContext curContext = RpcContext.getContext();
    // 省略 初始化 RpcContext 

    // 当 referer配置多个protocol的时候,比如A,B,C,
    // 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
    for (Cluster<T> cluster : clusters) {
        // 如果开关处于关闭状态,不会去调用这个远程机器
        String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
        Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
        if (switcher != null && !switcher.isOn()) {
            continue;
        }

        request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
        request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
        // 带上client的application和module
        request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
        request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());

        Response response = null;
        boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(), URLParamType.throwException.getValue()));
        try {
            MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_INVOKE);
            // 执行调用
            response = cluster.call(request);
            if (async) {
                // 省略异步调用的支持    
            } else {
                Object value = response.getValue();
                if (value != null && value instanceof DeserializableObject) {
                    try {
                        value = ((DeserializableObject) value).deserialize(returnType);
                    } catch (IOException e) {
                        LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
                        throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
                    }
                }
                return value;
            }
        } catch (RuntimeException e) {
            // 异常处理,包括处理是否向上游服务抛出
        }
    }
    throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + interfaceName + " " + MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
}

  真正执行调用的是 cluster.call(),这里先处理可高用,默认是 failOver(还有FailFast),然后再通过负载均衡获取要通信的服务地址。

public Response call(Request request) {
    if (available.get()) {
        try {
            // haStrategy是通过SPI来管理的,默认的HA策略是 failover
            // 即调用失败时,自动尝试其他服务器
            return haStrategy.call(request, loadBalance);
        } catch (Exception e) {
            return callFalse(request, e);
        }
    }
    return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}

  然后由 haStrategy.call()

public Response call(Request request, LoadBalance<T> loadBalance) {
    // refer列表,这里就是负载均衡的处理
    List<Referer<T>> referers = selectReferers(request, loadBalance);
    if (referers.isEmpty()) {
        throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request,
                loadBalance));
    }
    URL refUrl = referers.get(0).getUrl();
    // 这里是配置中配置的 retries 重试次数,默认:0
    int tryCount =
            refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                    URLParamType.retries.getIntValue());
    // 如果有问题,则设置为不重试
    if (tryCount < 0) {
        tryCount = 0;
    }

    for (int i = 0; i <= tryCount; i++) {
        Referer<T> refer = referers.get(i % referers.size());
        try {
            request.setRetries(i);
            return refer.call(request); // RPC
        } catch (RuntimeException e) {
            // 对于业务异常,直接抛出
            if (ExceptionUtil.isBizException(e)) {
                throw e;
            } else if (i >= tryCount) {
                throw e;
            }
            LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
        }
    }

    throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
}

  然后,refer.call()

public Response call(Request request) {
    if (!isAvailable()) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " call Error: node is not available, url=" + url.getUri()
                + " " + MotanFrameworkUtil.toString(request));
    }
    // 增加目标server的连接数,用于loadBalance
    incrActiveCount(request);
    Response response = null;
    try {
        response = doCall(request); // do rpc
        return response;
    } finally {
        // 调用完要将目标server的连接数-1
        decrActiveCount(request, response);
    }
}

  最后 doCall() 就是通过 Netty 去调用服务了。

 

标签:调用,系列,getName,request,private,new,motan,method
来源: https://www.cnblogs.com/jing-yi/p/15742750.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有