ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

dubbo之NIO服务器-抽象API

2022-02-05 17:03:09  阅读:131  来源: 互联网

标签:dubbo NIO void API Channel RemotingException public channel


写在前面

再java社区中,比较优秀的NIO框架有netty(netty3.x,netty4.x),mina,grizzly,dubbo基于dubbo:\\协议和thrift:\\协议实现了自己的NIO服务器,当然底层会直接使用现有的NIO框架(毕竟重复造轮子的成本还是比较高的),那么到底选择哪种NIO框架呢?dubbo的做法是让用户自己选择,具体的做法是先提供API层,然后针对具体的NIO框架提供具体的实现,如下:

API层:
    dubbo-remoting-api
实现:
    dubbo-remoting-netty
    dubbo-remoting-netty4
    dubbo-remoting-mina
    dubbo-remoting-grizzly
    dubbo-remoting-p2p

然后再配合Dubbo SPI机制 就可以让用户自由选择使用了。本文我们一起来看下dubbo-remoting-api部分。

当我们使用netty来编写网络通信程序时,一般需要如下的四个类:

Server:用来启动服务端的类。
ServerHandler:服务端用来接收处理客户端消息的类。
Client:用来启动客户端的类。
ClientHandler:客户端用来处理服务端消息的类。

dubbo也与此类似,对应的类如下:

Server->com.alibaba.dubbo.remoting.Server
ServerHandler->com.alibaba.dubbo.remoting.ChannelHandler
Client->com.alibaba.dubbo.remoting.Client
ClientHandler->com.alibaba.dubbo.remoting.ChannelHandler

但是实际上,因为dubbo使用的是自定义的协议,如dubbo://,如协议编码接口com.alibaba.dubbo.remoting.Codec2,消息分发接口com.alibaba.dubbo.remoting.Dispatcher,本文涉及到的类图如下:

在这里插入图片描述

1:Endpoint

Endpoint的英文翻译是端点,可以理解为和外界沟通的门户,用来进一步封装通信连接,源码如下:

public interface Endpoint {
    // 获取URL地址
    URL getUrl();
    // 获取ChannelHandler
    ChannelHandler getChannelHandler();
    // 获取本地网络地址
    InetSocketAddress getLocalAddress();
    // 发送消息
    void send(Object message) throws RemotingException;
    // 发送消息
    void send(Object message, boolean sent) throws RemotingException;
    // 关闭通道
    void close();
    // 优雅方式关闭通道
    void close(int timeout);
    void startClose();
    // 是否关闭
    boolean isClosed();
}

1.1:Channel

接口签名是public interface Channel extends Endpoint{},继承了Endpoint接口,因此Channel也是一个Endpoint,源码如下:

public interface Channel extends Endpoint {
    // 获取通道远端地址
    InetSocketAddress getRemoteAddress();
    // 是否连接
    boolean isConnected();
    // 是否有某个属性
    boolean hasAttribute(String key);
    // 获取属性
    Object getAttribute(String key);
    // 设置属性
    void setAttribute(String key, Object value);
    // 删除属性
    void removeAttribute(String key);
}

该接口是通讯的载体,不同的NIO框架有不同的实现,如com.alibaba.dubbo.remoting.transport.netty4.NettyChannel

1.2:Client

客户端接口,继承了Endpoint接口,因此Client是一个Endpoint,源码如下:

public interface Client extends Endpoint, Channel, Resetable {
    // 重新连接服务端
    void reconnect() throws RemotingException;

    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);
}

1.3:Server

服务端接口,继承了Endpoint接口,因此Server是一个Endpoint,源码如下:
源码如下:

public interface Server extends Endpoint, Resetable {
    // 是否绑定本地端口,即已经启动服务,可以被客户端连接和接收消息
    boolean isBound();
    // 获得客户端的连接们
    Collection<Channel> getChannels();
    // 获取指定地址的客户端连接
    Channel getChannel(InetSocketAddress remoteAddress);
    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);
}

1.3.1:Resetable

可重置接口,用于服务端根据传入的URL参数重置内部的相关属性,源码如下:

public interface Resetable {
    // 根据传入的URL参数重置内部相关属性
    void reset(URL url);
}

1.4:ChannelHandler

负责通道的逻辑处理,比如从通道中获取消息,发送消息,可以理解为通道的包装器,源码如下:

@SPI
public interface ChannelHandler {
    void connected(Channel channel) throws RemotingException;
    void disconnected(Channel channel) throws RemotingException;
    void sent(Channel channel, Object message) throws RemotingException;
    void received(Channel channel, Object message) throws RemotingException;
    void caught(Channel channel, Throwable exception) throws RemotingException;
}

比如netty4提供的具体实现com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler,内部会使用netty4的API执行具体操作。

2:Transporter

网络传输接口,源码如下:

@SPI("netty")
public interface Transporter {
    // 绑定Server,使用自适应方法加载对应的Server的Transport实现类
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    // 连接服务器,使用自适应方法加载对应的Client的Transport实现类
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

使用该类来获取服务端和服务端对应的类Server和Client。

3:Transporters

这是Transport门面类 ,因为不同的NIO框架的获取Server和Client的逻辑是不相同的,且还具有一定的复杂度,不同的NIO框架的具体实现就是不同的子系统,因此这里使用了门面设计模式来简化客户端的使用。

源码如下:

// Transporter门面,简化客户端使用各种不同的Transporter的复杂度
public class Transporters {

    static {
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    private Transporters() {
    }
    
    // 获取Server的门面方法,会根据url参数的不同获取不同的Transporter最终获取Server
    public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
        return bind(URL.valueOf(url), handler);
    }

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        // 创建使用Channel进行通信的ChannelHandler,如果是多个则会创建ChannelHandlerDispatcher
        // 内部会循环调用每一个ChannelHandler
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

    // 获取Client的门面方法,会根据url参数的不同获取不同的Transporter最终获取Client
    public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url), handler);
    }

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        // 2022-02-04 21:22:35
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
}

2022-02-04 21:22:35处是通过dubbo的自适应机制 来动态获取底层NIO框架对应的Transporter。

4:Codec2

消息编解码顶层接口,源码如下:

@SPI
public interface Codec2 {
    // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
    // 类以获取具体子类调用对应方法
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
    // 类以获取具体子类调用对应方法
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;

    // 处理拆包,粘包策略枚举
    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }
}

4.1:Codec

老版本的Codec2,源码如下;

@Deprecated
@SPI
public interface Codec {
    Object NEED_MORE_INPUT = new Object();

    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, OutputStream output, Object message) throws IOException;

    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, InputStream input) throws IOException;
}

使用了类com.alibaba.dubbo.remoting.transport.codec.CodecAdapter进行适配。

4.2:Decodeable

可编码接口:

public interface Decodeable {
    void decode() throws Exception;
}

5:Dispatcher

消息分发接口:

@SPI(AllDispatcher.NAME)
public interface Dispatcher {
    // 分发消息到线程池中,使用了Adaptive机制
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // 最后2个参数是为了兼容老版本配置而保留
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

6:RemotingException

源码如下:

public class RemotingException extends Exception {

    private static final long serialVersionUID = -3160452149606778709L;
    // 本地地址
    private InetSocketAddress localAddress;
    // 远端地址
    private InetSocketAddress remoteAddress;

    public RemotingException(Channel channel, String msg) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                msg);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message) {
        super(message);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public RemotingException(Channel channel, Throwable cause) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                cause);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, Throwable cause) {
        super(cause);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public RemotingException(Channel channel, String message, Throwable cause) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                message, cause);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message,
                             Throwable cause) {
        super(message, cause);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public InetSocketAddress getLocalAddress() {
        return localAddress;
    }

    public InetSocketAddress getRemoteAddress() {
        return remoteAddress;
    }
}

RemotingException有两个子类异常,分别是ExecutionException,TimeoutException,分别如下:

public class ExecutionException extends RemotingException {
    private static final long serialVersionUID = -2531085236111056860L;
    private final Object request;
    // 省略各种构造函数
}
public class TimeoutException extends RemotingException {
    // 代表是客户端的常量
    public static final int CLIENT_SIDE = 0;
    // 代表是服务端的常量
    public static final int SERVER_SIDE = 1;
    private static final long serialVersionUID = 3122966731958222692L;
    // 阶段
    private final int phase;
    
    // 省略各种构造函数
}

标签:dubbo,NIO,void,API,Channel,RemotingException,public,channel
来源: https://blog.csdn.net/wang0907/article/details/122765202

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有