以服务于中国广大创业者为己任,立志于做最好的创业网站。

标签云创业博客联系我们

导航菜单

对app的定义 app概念

  

  当RPC框架使用Netty进行通信时,实际上是将数据转换成ByteBuf进行传输。   

  

  怎么改造?可以直接将请求参数或响应结果序列化成字节数组发送出去吗?   

  

  答:不能直接连载传输,会有粘包拆包问题。   

  

  #粘包拆包?是什么   

  

  RPC通信用的是TPC(别问我为什么不用UDP),TCP是“流”协议。流是一长串没有边界的二进制数据。三氯苯酚   

  

  作为传输层协议,它并不知道上层业务数据的具体含义,而是根据TCP缓冲区的实际情况来划分数据包,因此在业务上被认为是一个完整的数据包,可能会被TCP阻塞。   

  

  拆分成多个包进行传输,也有可能将多个小数据包打包成一个大数据包进行传输,这就是所谓的TCP拆包和卡包问题。   

  

  直接序列化发送就可以了,但是接收方已经收到了一个包,不知道一个完整的包从哪里开始到哪里结束,所以无法解析。   

  

  #粘包拆包's解决方案。   

  

  因为底层的TCP,   

  

  无法理解上层业务数据,无法保证数据包不会在底层拆分重组。这个问题只能通过上层应用协议栈设计来解决。目前业界主流协议的解决方案如下:   

  

  1.消息定长:消息长度是固定的,例如,每条消息的长度固定为200字节。如果没有足够的空间来填充空格,接收器每次将占用200字节。   

  

  2.使用特殊分隔符分段:例如,在每个消息的末尾添加回车换行符作为消息分隔符,当接收者阅读回车换行符.时,消息被分段   

  

  3.消息分为消息头和消息体,消息头包含消息的长度。当接收者从消息头获得消息长度时,它知道剩余的消息有多少字节。   

  

  4.更复杂的定制应用层协议。   

  

  #解码.   

  

  在网络通信中,将数据转换为消息的过程称为编码,将消息转换为数据的过程称为解码.   

  

  在Netty中,编译解码的过程放在PipeLine中。在前面的介绍中,我们知道每个管道都唯一地绑定到一个通道。   

  

  管道只对应一个通道,所以通道。   

  

  读取时,会解析中的数据。如果它不是一个完整的数据包,解析将失败。保存数据包,下次再用数据包组装解析。在完整的数据包被解析之前,数据包不会被传递下去。   

  

  #解码.   

  

  Netty提供了几种解码设备,即:   

  

  1.linebasedframecodec:由按行.分包   

  

  2.基于分隔符的帧编码器:由特殊分隔符.分包   

  

  3.FixedLengthFrameCoder:使用定长's消息进行分包。   

  

  4.LengthFieldBasedframeCoder:消息分为消息头和消息体,消息头包含消息的长度。   

  

  在RPC的场景中,让我们分析一下应该选择哪个解码:   

  

  1.linebasedframecode:在按行,分包显然是不可行的,因为我们的请求响应数据可能包含换行符。   

  

  2.分隔符BasedframeCoder:遵循特殊分隔符,是不可接受的,因为RPC框架是一个通用的场景,请求响应数据中的所有内容都可能包含在内,请求响应数据中可能存在任何特殊分隔符。这将导致分包错误。   

  

  3.FixedLengthFrameCoder:使用定长消息显然更不合适。在RPC框架这样的一般场景下,固定长度太短,可能不够。如果太长,会造成资源的极大浪费。   

  

  4.LengthFieldBasedframeCoder:将消息分成头和正文的方法在大多数网络通信场景中使用。Ccx-rpc采用了这种解码协议,并定义了自己的一组私有协议(如下所述)。   

  

  #编码.   

  

  Netty   

  

  提供了个常用的抽象编码器:MessageToByteEncoder,编码器不像解码   

器需要考虑粘包拆包,只需要将数据转换成协议规定的二进制格式发送即可。

  

# ccx-rpc 的自定义协议

  

前面提到 ccx-rpc 使用了消息头+消息体 的方式制定私有协议。其格式如下:

  

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +---+---+-------+---+---+---+---+-----------+---------+--------+---+---+---+---+---+---+---+---+ | magic |version| full length |messageType|serialize|compress| RequestId | +---+---+-------+---+---+---+---+-----------+---------+--------+---+---+---+---+---+---+---+---+ | | | body | | | | ... ... | +----------------------------------------------------------------------------------------------+2B magic(魔数)1B version(版本)4B full length(消息长度)1B messageType(消息类型)1B serialize(序列化类型)1B compress(压缩类型)8B requestId(请求的Id)body(object类型数据)

  

# 字段解释

  

1\. magic(魔数)

  

是通信双方协商的一个暗号,2 个字节,定义在 MessageFormatConst.MAGIC。

  

魔数的作用是用于服务端在接收数据时先解析出魔数做正确性对比。如果和协议中的魔数不匹配,则认为是非法数据,可以直接关闭连接或采取其他措施增强系统安全性。

  

注意 :这只是一个简单的校验,如果有安全性方面的需求,需要使用其他手段,例如 SSL/TLS。

  

魔数的思想在很多场景中都有体现,如 Java Class 文件开头就存储了魔数 OxCAFEBABE,在 JVM 加载 Class

  

文件时首先就会验证魔数对的正确性。

  

2\. version(版本)

  

为了应对业务需求的变化,可能需要对自定义协议的结构或字段进行改动。不同版本的协议对应的解析方法也是不同的。所以在生产级项目中强烈建议预留协议版本这个字段。

  

3\. full length(消息长度)

  

记录了整个消息的长度,这个字段是报文分包的关键。

  

4\. messageType(消息类型)

  

消息类型包括,普通请求、普通响应、心跳 ping、心跳 pong。解码器可以根据消息类型来确定解析的类型。

  

消息类型的定义如下:

  

public enum MessageType { / * 普通请求 */ REQUEST((byte) 1), / * 普通响应 */ RESPONSE((byte) 2), / * 心跳 ping 请求 */ HEARTBEAT_PING((byte) 3), / * 心跳 pong 响应 */ HEARTBEAT_PONG((byte) 4), ; private final byte value;}

  

6\. serialize(序列化类型)

  

通过这个类型来确定使用哪种序列化方式,将字节流序列化成对应的对象。

  

序列化类型定义如下:

  

public enum SerializeType { PROTOSTUFF((byte) 1, "protostuff");}

  

7\. compress(压缩类型)

  

序列化的字节流,还可以进行压缩,使得体积更小,在网络传输更快,但是同时会消耗 CPU 资源。

  

如果使用压缩效果好的序列化器,可以考虑不适用压缩。

  

压缩类型的定义如下:

  

public enum CompressType { / * 伪压缩器,啥事不干。有一些序列化工具压缩已经做得很好了,无需再压缩 */ DUMMY((byte) 0, "dummy"), GZIP((byte) 1, "gzip"); private final byte value; private final String name;}

  

8\. requestId(请求的Id)

  

每个请求分配好请求Id,这样响应数据的时候,才能对的上。使用 8 字节的 long 类型,可以支持更多的请求。

  

9\. body

  

body 里面放具体的数据,通常来说是请求的参数、响应的结果,再经过序列化、压缩后的字节数组。

  

# ccx-rpc 的编码器 RpcMessageEncoder#

  

RpcMessage 是通用的消息结构体,请求参数和响应结果都会封装成这个结构。

  

编码器相对比较简单,按照协议定义的长度和值进行设置,例如请求Id是8字节的Long,那就

  

out.writeLong(rpcMessage.getRequestId())。

  

有个细节:消息长度事先不知道 body 的长度,可以先跳过。当然也可以先把 body 解析出来算长度。

  

代码如下:

  

@Overrideprotected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) { // 2B magic code(魔数) out.writeBytes(MessageFormatConst.MAGIC); // 1B version(版本) out.writeByte(MessageFormatConst.VERSION); // 4B full length(消息长度). 总长度先空着,后面填。 out.writerIndex(out.writerIndex() + MessageFormatConst.FULL_LENGTH_LENGTH); // 1B messageType(消息类型) out.writeByte(rpcMessage.getMessageType()); // 1B codec(序列化类型) out.writeByte(rpcMessage.getSerializeType()); // 1B compress(压缩类型) out.writeByte(rpcMessage.getCompressTye()); // 8B requestId(请求的Id) out.writeLong(rpcMessage.getRequestId()); // 写 body,返回 body 长度 int bodyLength = writeBody(rpcMessage, out); // 当前写指针 int writerIndex = out.writerIndex(); out.writerIndex(MessageFormatConst.MAGIC_LENGTH + MessageFormatConst.VERSION_LENGTH); // 4B full length(消息长度) out.writeInt(MessageFormatConst.HEADER_LENGTH + bodyLength); // 写指针复原 out.writerIndex(writerIndex);}

  

写 body 的方法抽了出来,因为涉及到了消息类型、序列化、压缩等步骤,比较长。代码如下:

  

private int writeBody(RpcMessage rpcMessage, ByteBuf out) { byte messageType = rpcMessage.getMessageType(); // 如果是 ping、pong 心跳类型的,没有 body,直接返回头部长度 if (messageType == MessageType.HEARTBEAT_PING.getValue() || messageType == MessageType.HEARTBEAT_PONG.getValue()) { return 0; } // 序列化类型 SerializeType serializeType = SerializeType.fromValue(rpcMessage.getSerializeType()); if (serializeType == null) { throw new IllegalArgumentException("codec type not found"); } // 根据序列化类型获得序列化器 Serializer serializer = ExtensionLoader.getLoader(Serializer.class).getExtension(serializeType.getName()); // 压缩类型 CompressType compressType = CompressType.fromValue(rpcMessage.getCompressTye()); // 根据压缩类型获得压缩器 Compressor compressor = ExtensionLoader.getLoader(Compressor.class).getExtension(compressType.getName()); // 使用序列化器对数据进行序列化 byte[] notCompressBytes = serializer.serialize(rpcMessage.getData()); // 序列化完之后进行压缩 byte[] compressedBytes = compressor.compress(notCompressBytes); // 写 body out.writeBytes(compressedBytes); return compressedBytes.length;}

  

从上面的代码和注释可以看出,写 body 的流程如下:

  

1. 判断消息类型,如果是心跳的,则不用写 body

  

2. 根据序列化类型 获得序列化器

  

3. 根据压缩类型 获得压缩器

  

4. 使用序列化器对数据 进行序列化

  

5. 序列化完的数据再 进行压缩 。如果获取不到压缩器,则不压缩,这里抽象成一个伪序列化器DummyCompressor ,少点特殊化代码。

  

public class DummyCompressor implements Compressor { @Override public byte[] compress(byte[] bytes) { return bytes; } @Override public byte[] decompress(byte[] bytes) { return bytes; }}

  

6. 压缩完的数据,就可以通过 out.writeBytes(compressedBytes) 写到输出流啦

  

# ccx-rpc 的解码器 RpcMessageDecoder

  

# LengthFieldBasedFrameDecoder

  

ccx-rpc 的解码器 RpcMessageDecoder 继承 Netty 自带的

  

LengthFieldBasedFrameDecoder,其完整的构造函数定义如下:

  

public LengthFieldBasedFrameDecoder( ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { // 忽略 ...}

  

构造函数的参数非常多,我们来一一解释一下:

  

1. byteOrder:在各种计算机体系结构中,对于字节、字等的存储机制有所不同。如果不达成一致的规则,通信双方将无法进行正确的编/译码从而导致通信失败。默认值是:ByteOrder.BIG_ENDIAN。

  

2. maxFrameLength:指定包的最大长度,如果超过,直接丢弃

  

3. lengthFieldOffset:描述长度的字段(我们叫length)在哪个位置(前面有几个字节)

  

4. lengthFieldLength:length 字段本身的长度(几个字节)

  

5. lengthAdjustment:包的总长度调整。

  

这个参数比较难理解,我们先假设 lengthFieldOffset = 3,lengthFieldLength=4,我们存的长度是 10。

  

那么lengthFieldOffset、lengthFieldLength可以拿到长度结束的偏移量(lengthFieldEndOffset)是 7。

  

这个长度10,Netty 认为是 length 字段后的长度,所以 Netty

  

在计算消息总长度frameLength的时候,会再加上lengthFieldEndOffset:frameLength +=

  

lengthFieldEndOffset。

  

如果我们本来存的长度就是 length 字段后的长度,那这个结果就是对的了。但是我们长度存的就是总长度,这么一加,就相当于多加了一个

  

lengthFieldEndOffset 了!!!

  

由于协议的定义没有谁对谁错,也不能强制要人家就那么设置,所以 Netty 还提供了一个长度调整参数 lengthAdjustment 给我们,

  

frameLength += lengthAdjustment。

  

因为多加了 lengthFieldEndOffset,那我们把这个它减回去,所以大部分的时候,这个参数就是个负数。

  

6. initialBytesToStrip:之前的几个参数,已经足够识别出整个数据包了。但是很多时候,调用者只关心包的内容,包的头部完全可以丢弃掉,initialBytesToStrip 就是用来告诉 Netty,识别出整个数据包之后,截掉 initialBytesToStrip 之前的数据。

  

7. failFast:参数一般设置为 true。当这个参数为 true 时,Netty 一旦读到 length 字段,并判断 length 超过 maxFrameLength,就立即抛出异常。false 表示只有当真正读取完所有的字节之后,才会抛出异常。一般不用修改,否则可能会内存溢出。

  

# RpcMessageDecoder 构造函数

  

下面来看看,ccx-rpc 是如何使用这几个参数的吧,上代码:

  

public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder { public RpcMessageDecoder() { super( // 最大的长度,如果超过,会直接丢弃 MAX_FRAME_LENGTH, // 描述长度的字段[4B full length(消息长度)]在哪个位置:在 [2B magic(魔数)]、[1B version(版本)] 后面 MAGIC_LENGTH + VERSION_LENGTH, // 描述长度的字段[4B full length(消息长度)]本身的长度,也就是 4B 啦 FULL_LENGTH_LENGTH, // LengthFieldBasedFrameDecoder 拿到消息长度之后,还会加上 [4B full length(消息长度)] 字段前面的长度 // 因为我们的消息长度包含了这部分了,所以需要减回去 -(MAGIC_LENGTH + VERSION_LENGTH + FULL_LENGTH_LENGTH), // initialBytesToStrip: 去除哪个位置前面的数据。因为我们还需要检测 魔数 和 版本号,所以不能去除 0); }}

  

解码的方法先使用父类 LengthFieldBasedFrameDecoder 的 decode 方法得到完整的报文数据:

  

@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object decoded = super.decode(ctx, in); if (decoded instanceof ByteBuf) { ByteBuf frame = (ByteBuf) decoded; if (frame.readableBytes() >= HEADER_LENGTH) { try { return decodeFrame(frame); } catch (Exception ex) { log.error("Decode frame error.", ex); } finally { frame.release(); } } } return decoded;}

  

注意:如果解码报错,需要调用 frame.release() 来释放。

  

# 自定义协议解码

  

# 1\. 初步读出字段并做基础检查

  

接下来就是自定义协议的解码方法 decodeFrame,返回值就是业务的消息结构体 RpcMessage。

  

/ * 业务解码 */private RpcMessage decodeFrame(ByteBuf in) { readAndCheckMagic(in); readAndCheckVersion(in); int fullLength = in.readInt(); byte messageType = in.readByte(); byte codec = in.readByte(); byte compress = in.readByte(); long requestId = in.readLong(); RpcMessage rpcMessage = RpcMessage.builder() .serializeType(codec) .compressTye(compress) .requestId(requestId) .messageType(messageType) .build(); //...}

  

第一步:检查魔数,比较简单,就是把前两位字节读出来,跟我们的魔数进行对比,不一样就抛出异常。

  

/ * 读取并检查魔数 */private void readAndCheckMagic(ByteBuf in) { byte[] bytes = new byte[MAGIC_LENGTH]; in.readBytes(bytes); for (int i = 0; i < bytes.length; i++) { if (bytes[i] != MAGIC[i]) { throw new IllegalArgumentException("Unknown magic: " + Arrays.toString(bytes)); } }}

  

第二步:检查版本,目前来说版本的逻辑还很简单。后续如果版本不一样,可能解码的方式还不一样。

  

/ * 读取并检查版本 */private void readAndCheckVersion(ByteBuf in) { byte version = in.readByte(); if (version != VERSION) { throw new IllegalArgumentException("Unknown version: " + version); }}

  

第三步:读出其他字段,并初步构造出 RpcMessage

  

# 2\. 不需要解析 body 的情况

  

正常来说我们接下来需要解析 body 了,但是有几种情况是不需要解析的。那就是心跳类型的请求、body 长度 0 的情况。

  

if (messageType == MessageType.HEARTBEAT_PING.getValue()) { rpcMessage.setData(PING_DATA); return rpcMessage;}if (messageType == MessageType.HEARTBEAT_PONG.getValue()) { rpcMessage.setData(PONG_DATA); return rpcMessage;}int bodyLength = fullLength - HEADER_LENGTH;if (bodyLength == 0) { return rpcMessage;}

  

# 3\. 解析 body

  

拿到 body 之后,应该先要解压再反序列化,跟编码时的先序列化再压缩相反。代码如下:

  

byte[] bodyBytes = new byte[bodyLength];in.readBytes(bodyBytes);CompressType compressType = CompressType.fromValue(compress);// 根据压缩类型找出压缩器Compressor compressor = ExtensionLoader.getLoader(Compressor.class).getExtension(compressType.getName());// 进行解压byte[] decompressedBytes = compressor.decompress(bodyBytes);SerializeType serializeType = SerializeType.fromValue(codec);if (serializeType == null) { throw new IllegalArgumentException("unknown codec type:" + codec);}// 根据序列化类型找出序列化器Serializer serializer = ExtensionLoader.getLoader(Serializer.class).getExtension(serializeType.getName());// 根据消息类型获取消息体结构Class clazz = messageType == MessageType.REQUEST.getValue() ? RpcRequest.class : RpcResponse.class;// 反序列化Object object = serializer.deserialize(decompressedBytes, clazz);rpcMessage.setData(object);return rpcMessage;

  

# 总结

  

上文介绍了 TCP 中的粘包拆包问题,并且介绍了 Netty 提供的解决方案。着重介绍了 ccx-rpc 选择的

  

LengthFieldBasedFrameDecoder,他的构造参数比较多,一次看不懂没关系,多看几遍,尝试 debug 一下代码,也许就豁然开朗了。

  

最后介绍了 ccx-rpc 的自定义协议和编解码器,大家在自定义协议的时候,可以不用跟我的一样,不过大体上的思想是一样的,希望同学们能活学活用。

  

> ccx-rpc 代码已经开源

  

> Github:https://github.com/chenchuxin/ccx-rpc

  

> Gitee:https://gitee.com/imccx/ccx-rpc

  

> 作者:小新是也

  

> 出处:https://www.cnblogs.com/chenchuxin/p/15227253.html