ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Dubbo源码解析-Consumer发送请求全过程

2021-11-30 12:32:22  阅读:144  来源: 互联网

标签:Dubbo 调用 invoke class 源码 invocation new Consumer public


前言:

之前的文章已经从调用结构方面从前到后整个梳理了一下全过程。

本篇就从实战调用角度来分析下整个过程,之前是抽象,现在就是实战。

1.示例代码

代码的话跟之前是一样的,笔者在这里再贴一下

1.1 provider

public class ProviderApplication {
    public static void main(String[] args) throws Exception {
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());
        service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        service.export();

        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

1.2 comsumer

public class ConsumerApplication {
    public static void main(String[] args) {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        reference.setScope("remote");
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

2.消费者调用全过程

之前的博客已经分析过主要的步骤,在这里笔者快速过一下

我们就从String message = service.sayHello("dubbo"); 这句调用开始

2.1 service即Proxy0

通过javassist创建的Proxy0继承了Proxy抽象类。简略内容如下所示:

public final class $Proxy0
        extends Proxy
        implements Subject {
    private static Method m1;
    private static Method m2;
    private static Method m3;
    private static Method m0;

    public $Proxy0(InvocationHandler paramInvocationHandler) {
        super(paramInvocationHandler);
    }
	@Override
    public final String sayHello(String paramString) {
        try {
        // 本质上是对InvocationHandler的调用
            return (String) this.h.invoke(this, m3, new Object[]{paramString});
        } catch (Error | RuntimeException localError) {
            throw localError;
        } catch (Throwable localThrowable) {
            throw new UndeclaredThrowableException(localThrowable);
        }
    }
}

Proxy代理类对方法的调用,最终都反映到InvokerInvocationHandler的调用上

2.2 InvokerInvocationHandler.invoke()

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;
	...

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        ...
        // 将请求的所有信息都包装进来,请求方法、接口信息、参数信息    
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        // 这里的invoker=MockClusterInvoker
        return invoker.invoke(rpcInvocation).recreate();
    }
}

请求体所有信息都包装在RpcInvocation,交由下一个Invoker(MockClusterInvoker)处理

2.3 ClusterInvoker.invoke()

ClusterInvoker的调用链为:MockClusterInvoker --> FailoverClusterInvoker

MockClusterInvoker主要作用就是进行Mock访问,这个我们后续再仔细说明;

FailoverClusterInvoker意义比较重大,它提供的是一种消费者调用时的集群容错方案,在多个服务提供者情况下,当前消费者调用A提供者失败时,会自动切换到B提供者再次调用,最多重试N次。

当然除了FailoverClusterInvoker,还有其他多种集群容错方案可供选择,这个后续会有系列文章进行说明。

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 重试次数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        ...
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                checkInvokers(copyInvokers, invocation);
            }
            // 选择合适的dubbo provider Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 针对这个Invoker发起调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    ...
                }
                return result;
            // 异常,则再次进入循环,进入下一次调用    
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        ...
    }
}

在ClusterInvoker的包装下,完成了Dubbo集群容错与负载均衡策略的实现。后续会更加详细的对这两方面进行介绍。 

2.4 ProtocolFilterWrapper.invoke()

Filter包装类的调用,在真正调用到DubboProtocol之前,会先经过一系列的Filter的调用

public class ProtocolFilterWrapper implements Protocol {
 
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 通过SPI的方式获取所有的Filter(属于Consumer组)
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                   ...
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // 逐个调用Filter
                            asyncResult = filter.invoke(next, invocation);
                        } ...
                };
            }
        }

        return last;
    }
}

似乎每个框架都有Filter层,可以在真正调用前做一些全局操作,后续会有专门的Filter专题来介绍,这里我们知道即可。

2.5 ListenerInvokerWrapper.invoke()

public class ListenerInvokerWrapper<T> implements Invoker<T> {
 
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

}

主要是创建一系列的监听器,后续分析

2.6 同步?异步调用?

public class AsyncToSyncInvoker<T> implements Invoker<T> {

    private Invoker<T> invoker;
	...
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = invoker.invoke(invocation);

        try {
            // 如果是同步调用,则一直等待结果集
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } ...
        return asyncResult;
    }
}

Dubbo为了提高性能,提供了异步调用方式,后续专门介绍。

默认都是同步调用,会一直等待结果集

2.7 DubboInvoker.doInvoker()

最终执行到真正的调用类

public class DubboInvoker<T> extends AbstractInvoker<T> {
	protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        // 轮询获取可用client
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 最终通过调用currentClient.send()来进行同步或异步调用
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

2.8 HeaderExchangeChannel.request()

final class HeaderExchangeChannel implements ExchangeChannel {
 
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 创建request对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        // 主要内容就是request,包含了接口名、方法名、参数信息
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            // 交由channel发送出去
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

快到尾声了,后续的具体发送工作交由NettyClient执行(默认)

2.9 NettyClient.send()

public class NettyClient extends AbstractClient {
	public void send(Object message, boolean sent) throws RemotingException {
        // 未创建连接的情况下,则先创建连接
        if (needReconnect && !isConnected()) {
            connect();
        }
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 最后交由channel发送出去
        channel.send(message, sent);
    }
}

2.10 NettyChannel.send()

final class NettyChannel extends AbstractChannel {
	public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 最终通过channel发送出去
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
}

最终,还是通过Netty的channel将请求发送出去了

过程有点长,通过时序图来展示下

 

标签:Dubbo,调用,invoke,class,源码,invocation,new,Consumer,public
来源: https://blog.csdn.net/qq_26323323/article/details/121629709

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有