ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

dubbo接口方法重载且入参未显式指定序列化id导致ClassCastException分析

2022-01-16 22:35:29  阅读:142  来源: 互联网

标签:dubbo 解码 参未 header 序列化 id channel


问题描述&模拟

线上登录接口,通过监控查看,有类型转换异常,具体报错如下图

image-20220108220128374

此报错信息是dubbo consumer端显示,且登录大部分是正常,有少量部分会报类型转换异常,同事通过更换方法名+显示指定序列化id解决此问题,但是产生这个问题的真正原因是什么呢?没有指定序列化id吗?还是dubbo方法重载问题?为什么服务端不显示此错误信息呢?,下面根据错误模拟下情况。

线上运行情况说明,报错的这台客户端部署在容器内,jdk版本

image-20220108220823552

服务方是混跑,有虚拟机和容器,容器的jdk版本相同,虚拟机jdk版本

image-20220108220912474

一开始认为是由于没有显示指定序列化id导致容器调用虚拟机的服务,由于jvm版本不一致导致的解码问题,但是分析和试验后,发现并非如此,模拟情况如下:

定义一个dubbo服务,方法重载且入参不显示指定序列化id,代码如下

//定义dubbo服务
public interface ProductService {
	Result<ProductVO> findProduct(String data);
	Result<ProductVO> findProduct(ProductDTO product);
}

//入参
@Data
public class ProductDTO  implements Serializable {
    //不显示指定序列化id
	private Integer productId;
	private String sn;
	private String code;
}

//出参
@Data
public class ProductVO implements Serializable{
	private static final long serialVersionUID = 4529782262922750326L;
	private Integer productId;
	private String productName;
}

dubbo客户端调用ProductService.findProduct(ProductDTO product),并使用jdk1.8.0_202版本,服务方使用jdk1.8.0_73版本,经过试验(jmeter压测),发现并未出现类型转换异常,现在通过代码分析来排除。

分析&dubbo provider处理请求流程

采用逆序方法,使用arthas进行反编译dubbo生成的代理类,ProductService生成的代理类是Wrapper2,内容如下

public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
			throws InvocationTargetException {
		ProductService productService;
		try {
			productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		try {
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("java.lang.String")) {
				return productService.findProduct((String) objectArray[0]);
			}
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("org.pangu.dto.ProductDTO")) {
				return productService.findProduct((ProductDTO) objectArray[0]);
			}
		} catch (Throwable throwable) {
			throw new InvocationTargetException(throwable);
		}
		throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
				.append("\" in class org.pangu.api.ProductService.").toString());
	}
}

通过查看反编译后的代码,得知dubbo方法重载,会根据方法类型和参数个数找到对应的目标方法执行。对于我这个线上问题,参数是ProductDTO,如果调用的是findProduct(String data),说明classArray[0]即参数类型是String类型,那么参数类型是如何得来的呢?根据自己之前写的dubbo流程分析,查看源码,在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke(Invocation invocation),代码内容如下

image-20220108223056936

方法名称+方法类型+方面参数都封装在Invocation内,接着查找Invocation的来源,在DubboProtocol的匿名内部类DubboProtocol$1内发现,具体是reply(ExchangeChannel channel, Object message)方法内,参数message就是Invocation。

image-20220108225859703

接着看哪里调用DubboProtocol$1.reply(ExchangeChannel channel, Object message)方法,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest(ExchangeChannel channel, Request req)方法内,com.alibaba.dubbo.remoting.exchange.Request.getData()获取此Invocation,即DecodeableRpcInvocation,那么接着看Request 以及Request.mData的来源;

接着向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received(Channel channel, Object message)的入参message就是Request ;

继续向上找,com.alibaba.dubbo.remoting.transport.DecodeHandler#received(Channel channel, Object message)的入参就是Request ,其中会对Request.mData即Invocation进行解码(默认在IO线程已经解码过,这里实际并不会再执行解码DecodeableRpcInvocation#hasDecoded=true)。

image-20220108230753355

继续向上找,com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run()线程,message属性就是Request,那么接着只能找ChannelEventRunnable是如何创建并提交的

image-20220108230921910

继续向上找,在com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received(Channel channel, Object message)方法内创建ChannelEventRunnable并提交到线程池执行。

继续向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(Channel channel, Object message),入参message就是Request

继续向上找,com.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(Channel channel, Object message)

image-20220109003230986

继续向上找,com.alibaba.dubbo.remoting.transport.AbstractPeer.received(Channel ch, Object msg)

继续向上找,com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler.channelRead(ChannelHandlerContext ctx, Object msg),看到这个就说明是netty的work线程,NettyServerHandler是个inbound & outbound事件

dubbo service netty启动添加的inbound&outbound即pipeline chain[HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext],说明前面肯定有执行InternalDecoder 的channelRead事件。此时入参message就是Request。

下面着重分析InternalDecoder 的channelRead事件,执行堆栈依次为:

InternalDecoder(io.netty.handler.codec.ByteToMessageDecoder).channelRead(ChannelHandlerContext ctx, Object msg)
InternalDecoder(io.netty.handler.codec.ByteToMessageDecoder).callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
DecodeableRpcInvocation.decode()
DecodeableRpcInvocation.decode(Channel channel, InputStream input)

InternalDecoder是netty pipeline的inboud事件,执行的是channelRead,具体逻辑在InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)内,代码如下

image-20220109010146098

接着触发下一个inbound的channelRead动作,传入的就是Request了,代码说明如下

image-20220109010534969

接着看DubboCountCodec.decode(Channel channel, ChannelBuffer buffer),这里进行解码

//com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode(Channel channel, ChannelBuffer buffer)
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();//获取读位置
    MultiMessage result = MultiMessage.create();//MultiMessage是Request的集合
    do {
        Object obj = codec.decode(channel, buffer);//使用DubboCodec进行解码,下面根据解码结果进行不同处理
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//说明发生了tcp粘包,退出循环
            buffer.readerIndex(save);
            break;
        } else {
            result.addMessage(obj);//把obj即Request添加到集合MultiMessage
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();//设置新的buffer读位置,继续使用DubboCodec进行解码
        }
    } while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {//如果MultiMessage只有一个元素,则说明本次没有发生粘包
        return result.get(0);//返回Request
    }
    return result;//返回MultiMessage,在后续的MultiMessagehandler内获取Request的集合遍历处理
}

接着看DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)解码过程,如何对dubbo协议解码的,先看下dubbo协议的报文结构

接着看代码,对着报文结构进行解码

//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header);//把缓冲区字节存放到header
    return decode(channel, buffer, readable, header);
}

//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
        || readable > 1 && header[1] != MAGIC_LOW) {//非魔数,说明非dubbo报文的开头,说明发生了tcp拆包/粘包
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    if (readable < HEADER_LENGTH) {//为什么是小于16呢?因为dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)+bodyLenght(4)就是16字节了,小于16字节,肯定发生了拆包,本次接收到的数据并没有body
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header, 12);//12的原因是dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)等于12,从12位后取4位,转换为int,就是body的长度
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    if (readable < tt) {//可读取数少于bodylen+16,说明tcp拆包,需要继续进网络读取
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header);//解码body内容
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

接着看解码dubbo body,在com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

//com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {//是响应,编码
        //省略
    } else {//请求,解码
        // decode request.
        Request req = new Request(id);
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            if (req.isHeartbeat()) {//心跳
                data = decodeHeartbeatData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            } else if (req.isEvent()) {//事件
                data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            } else {
                DecodeableRpcInvocation inv;
                if (channel.getUrl().getParameter(
                    Constants.DECODE_IN_IO_THREAD_KEY,
                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//默认是在netty work线程进行解码
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    inv.decode();//解码dubbo body,解码结果保存在DecodeableRpcInvocation
                } else {
                    inv = new DecodeableRpcInvocation(channel, req,
                                                      new UnsafeByteArrayInputStream(readMessageData(is)), proto);//否则在业务线程ChannelEventRunnable进行解码
                }
                data = inv;
            }
            req.setData(data);//把Invocation保存到Request.mData
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

接着看DecodeableRpcInvocation解码dubbo body

//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode()
@Override
public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);//解码
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
            }
            request.setBroken(true);
            request.setData(e);
        } finally {
            hasDecoded = true;//解码后置位已经解码,这样在ChannelEventRunnable线程内就不会再进行解码
        }
    }
}

//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
        .deserialize(channel.getUrl(), input);//根据序列化标识获取反序列对象,dubbo spi的自适应

    String dubboVersion = in.readUTF();//从输入流读取dubbo version
    request.setVersion(dubboVersion);
    setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

    setAttachment(Constants.PATH_KEY, in.readUTF());//从输入流读path
    setAttachment(Constants.VERSION_KEY, in.readUTF());//从输入流读版本

    setMethodName(in.readUTF());//从输入流读 调用的目标方法名
    try {
        Object[] args;
        Class<?>[] pts;
        String desc = in.readUTF();//从输入流读 参数描述符,即参数的类型 比如[Ljava/lang/String
        if (desc.length() == 0) {//dubbo调用方法不存在入参
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {//dubbo调用方法存在入参
            pts = ReflectUtils.desc2classArray(desc);//类型描述符转换为类型,比如[Ljava/lang/String => Ljava.lang.String
            args = new Object[pts.length];//参数长度
            for (int i = 0; i < args.length; i++) {
                try {
                    args[i] = in.readObject(pts[i]);//从输入流读取参数,这里是readObject,执行反序列化
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        setParameterTypes(pts);//把参数类型保存到Invocation对象,即parameterTypes属性上

        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);//从输入流读取隐式参数并解码
        if (map != null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            attachment.putAll(map);
            setAttachments(attachment);
        }
        //decode argument ,may be callback
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

从解码dubbo body看出,从输入流解码获取调用的目标方法名称、方法类型、方法入参、隐式参数都保存到Invocation对象(即DecodeableRpcInvocation),其中读取入参和隐式参数使用到了序列化解码(需要使用到序列化id),而从输入流获取方法名称+参数类型并没有使用对象的反序列化。

dubbo provider处理接收总结

dubbo prodiver端从网络到dubbo业务线程池调用以及如何解码流程分析完,现在总结下:

image-20220109225136447

dubbo provider接收并处理consumer请求分两步

1.网络通信,在io线程上解码,解码结果保存到Request。

2.IO线程调起dubbo业务线程,传入解码结果Request,通过Invoker调用目标方法,传入要执行目标方法的对象、方法名、参数类型、参数进行调用目标方法。

该问题分析

解决2个问题

问题1:为什么在服务端报错ClassCastException,在服务端没有任何error日志呢?只有在客户端才有error日志

由于在dubbo代理类Wrapper2调用目标方法导致ClassCastException,异常被捕捉封装为InvocationTargetException向上抛,接着在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke内异常被捕捉,封装为RpcResult,继而在ExceptionFilter内异常信息被封装为RuntimeException返回客户端。这中间并没有日志打印,因此不产生error日志,所以服务端看不到。

问题2:dubbo方法重载会导致问题吗?

结论,基本不会,dubbo的动态代理类WrapperX会根据Invocation的methodName+参数类型+参数进行调用目标方法,因此不会。网上有个大佬说dubbo方法重载在某种情况会导致问题,但是他写的语句有些不通顺且凌乱,而且蓝绿是流量隔离的,不会调错,我认为他的举例不合适,感兴趣的可以参考dubbo同名方法的问题及思考

问题3:是否是未显式指定序列化id导致的呢?

经过前面分析,是由于判断参数类型是String(本来应该是DTO类型),导致执行目标方法时候把参数转换为String导致的异常,参数类型来源于Invocation对象(即RpcInvocation.parameterTypes),而Invocation来源于Request.mData,而Request是网络通信解码得来,其中在com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)String desc = in.readUTF();从输入流读取字节流并解码为参数类型描述符,这个地方并不涉及到对象的序列化和反序列化。

看客户端编码代码InternalEncoder,编码参数类型代码如下图

image-20220111002311123

而客户端发送建立Request是在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int),而Invocation对象是在dubbo调用的入口InvokerInvocationHandler内(new RpcInvocation(method, args)封装方法名+参数创建Invocation对象,继而参数类型就保存在了Invocation对象。

这样分析得来,不显示指定序列化id并不会导致这个问题

排除了jdk版本、不显示指定序列化ID等原因,具体是什么原因导致的dubbo方法重载导致调用ClassCastException呢?线上预发环境和生产网络是互通,是否是预发环境同事手工部署的应用只有入参String的方法呢(未和生产同步版本)?同事也记不清了,也无法查,这个问题暂时是无法知道答案了。

据我猜测,问题可能出现是预发环境部署的服务没有和生产版本同步(缺少findProduct(ProductDTOdata)导致),我们预发和生成网络是互通的,应该是生产客户端调用到了预发环境服务,而预发环境部署的此服务没有findProduct(ProductDTOdata)。

为什么需要显示指定序列化id

rpc调用使用的tcp通信,需要把对象转换为二进制流进行发送(编码)和接收(解码),那么就需要有套规则需要把内存中的java对象转换为二进制流,序列化就是做这个事情的。

在使用原生序列化的时候,serialVersionUID起到了一个类似版本号的作用,在反序列化的时候判断serialVersionUID如果不相同,会抛出InvalidClassException。

如果在使用原生序列化方式的时候官方是强烈建议指定一个serialVersionUID的,如果没有指定,在序列化过程中,jvm会自动计算出一个值作为serialVersionUID,由于这种运行时计算serialVersionUID的方式依赖于jvm的实现方式,如果序列化和反序列化的jvm实现方式不一样可能会导致抛出异常InvalidClassException,所以强烈建议指定serialVersionUID。

不显示指定序列化ID实际会导致问题吗?

定义一个dubbo的入参,不显示指定序列化id,客户端运行不变更,服务端入参进行增加或删除字段(类结构发生变化),发现均能正常请求,并非像网上所说的不显示指定序列化id情况下rpc参数类结构变化,并没有导致什么问题,当然我只是在jdk8版本下进行了此测试(当然现在都是jdk8),这样情况下,实际使用过程中,不显示指定序列化id好像也不会影响什么呢

网上有说法,不显示指定序列化id会导致一种情况出现问题:举个例子:比如该入参没有显示指定序列化id,后面有个需求需要在这个入参增加个字段,而且看没有显示指定序列化id,顺手就增加了个序列化id,这样线上运行的客户端应用由于引用的还是旧jar,新的服务部署上去,就会发送序列化失败(客户端jvm生成的序列化id和服务端显示指定的序列化id不同),好像这种情况是无法避免的。但是我经过测试,不显示指定序列化id情况下 对dubbo参数进行增加字段、删除字段、增加方法等都不会造成反序列化问题(jdk8, dubbo2.6.8下测试),请求均正常。验证结果说明jvm生成序列化id和类的结构没有关系。可以参考别人测试结果,和我测试结果相同。

那么是否就可以大胆的不指定序列化id呢?还是建议不要,鬼知道jvm生成序列化id的实现方式呢,不指定万一线上哪天出现幺蛾子。

验证了半天,得到一个不指定序列化id也没关系的实际验证结论,但是又不敢完全放心大胆不显示指定序列化id,抓狂。。。

最终结论

根据实际验证(jdk8, dubbo2.6.8下测试),不显示指定序列化id时,dubbo的传输对象在增加字段、删除字段、增加方法等都不会造成反序列化问题,但是还是强烈建议显示指定序列化id,万一jvm生成序列化id不兼容了呢

结尾

分析了这么长,最终也没找到这个问题的产生原因,但是对dubbo的通信层又加深了理解,下面一篇记录下总结的dubbo通信层

标签:dubbo,解码,参未,header,序列化,id,channel
来源: https://www.cnblogs.com/zhangyjblogs/p/15811482.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有