ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rocketmq传输协议

2021-10-25 17:58:33  阅读:144  来源: 互联网

标签:body 协议 header public 传输 length new headerData rocketmq


传输协议

在这里插入图片描述

可见传输内容主要可以分为以下4部分:

(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容

NettyDecoder

构造函数

定义获取1个网络包的格式

# FRAME_MAX_LENGTH=16777216
public NettyDecoder() { 
    super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 
}

在这里插入图片描述

decode

public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }
            ByteBuffer byteBuffer = frame.nioBuffer();
            return RemotingCommand.decode(byteBuffer); // decode
        } catch (Exception e) {
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }

RemotingCommand

decode

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit(); // 获取byteBuffer的总长度
        int oriHeaderLen = byteBuffer.getInt(); // 序列化类型(8byte) &  消息头长度(24byte):共同占用一个int类型  详见markProtocolType
        int headerLength = getHeaderLength(oriHeaderLen); //获取消息头的长度,这里和0xFFFFFF(6个F)做与运算,也就是24位,把序列化协议去掉了

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData); // 从byteBuffer里面读取headerLength长度的数据到headerData
        // 获取序列化类型  有1:rocketmq和0:json协议
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        // body的长度 = 总长度的值 - 总长度占用字节数:4 - 头长度
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData); // 从byteBuffer里面读取body数据
        }
        cmd.body = bodyData; // 获取body数据

        return cmd;
    }

NettyEncoder

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader(); // 获取Header数据 放入out
            out.writeBytes(header); // 放入header数据
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body); //获取body数据 放入out
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

RemotingCommand

encodeHeader

public ByteBuffer encodeHeader(final int bodyLength) {
	// 1> header length size
	int length = 4;

	// 2> header data length
	byte[] headerData;
	headerData = this.headerEncode();

	length += headerData.length; // 总长度4byte + headerData.length

	// 3> body data length
	length += bodyLength; // length += length(总长度4byte + headerData.length) + bodyLength
	// 存放(总长度4byte + header长度4byte(虚化列类型和header长度) + headerData.length ,一句话就是不包括body数据
	ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // +4 是因为 length不包括消息总长度

	// length  消息总长度
	result.putInt(length);

	// header length  header长度4byte(虚化列类型和header长度)
	result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

	// header data header的字节数据
	result.put(headerData);
	//  limit = position; position = 0; mark = -1;
	result.flip();

	return result;
}

headerEncode

把RemotingCommand转成二进制数据

private byte[] headerEncode() {
	this.makeCustomHeaderToNet();
	if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { // rocketmq协议 -> byte[]
		return RocketMQSerializable.rocketMQProtocolEncode(this);
	} else { // todo json方式  obj -> jsonString -> byte[]
		return RemotingSerializable.encode(this);  //把RemotingCommand自己传入进去,转成json的二进制数据
	}
}

NettyRemotingClient

start()

public void start() {

	Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast(
					defaultEventExecutorGroup,
					new NettyEncoder(), // 这个
					new NettyDecoder(), // 这个
					new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
					new NettyConnectManageHandler(),
					new NettyClientHandler());
			}
		});

	........
}

NettyRemotingServer

public void start() {
        
	ServerBootstrap childHandler =
	this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
		.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline()
					.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
					.addLast(defaultEventExecutorGroup,
						encoder, // 这里
						new NettyDecoder(), // 这里
						new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
						connectionManageHandler,
						serverHandler
					);
			}
		});

}

自定义RocketMQ的消息协议

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
在这里插入图片描述

自定义协议实现

详见RocketMQSerializable的rocketMQProtocolEncode、rocketMQProtocolDecode

标签:body,协议,header,public,传输,length,new,headerData,rocketmq
来源: https://blog.csdn.net/kq1983/article/details/120954462

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有