搜索
您的当前位置:首页关于TCP 和 Netty 拆包 粘包

关于TCP 和 Netty 拆包 粘包

来源:爱问旅游网

Netty权威指南第2版学习笔记

 

1. TCP 粘包/拆包问题的解决之道

 

 

1. Tcp连接的握手时延 与延迟确认算法

 

1) 请求新的TCP连接时,客户端要向服务器发送一个小的TCP分组(通常是40~60个字节),这个分组中设置了一个特殊的syn标记,说明这是一个连接请求。

2) 如果服务器接受了连接,就会对一些连接参数进行计算,并向客户端回送一个tcp分组,这个分组中的synack标记都被置位,说明连接请求已被接受

3) 最后,客户端向服务器回送一条确认消息(ack),通知它连接已成功建立。现代的tcp栈都允许客户端在这个确认分组中发送数据

 

由于因特网自身无法确保可靠的分组传输(因特网路由器超负荷的话,可以随意丢弃分组)所以TCP实现了自己的确认机制来确保数据的成功传输

 

每个TCP段都有一个序列号和数据完整性校验和,每个段的接收者收到完好的段时,都会向发送者回送小的确认分组。如果发送者没有在指定的窗口时间内收到确认信息,发送者就认为分组已经被破坏或损毁,并重发数据

 

由于确认报文很小,所以TCP允许在发送相同方向的输出数据分组中对其进行"捎带",TCP将返回的确认信息与输出的数据分组结合在一起,可以更有效地利用网络。为了增加确认报文找到同向传输数据分组的可能性,很多TCP栈都实现了一种 "延迟确认" 算法。 延迟确认算法会在一个特定的窗口时间(通常是100~200毫秒)内将输出确认存放在缓冲区中,以寻找能够捎带它的输出数据分组。如果在那个时间段内没有输出数据分组,就将确认信息放在单独的分组中传送

 

 

 

 

2. TCP慢启动

TCP数据传输的性能还取决于TCP连接的使用期(age).TCP连接会随着时间进行自我"调谐",起初会限制连接的最大速度,如果数据成功传输,会随着时间的推移提高传输的速度。这种调谐被称为TCP 慢启动(slow start),用于防止因特网的突然过载和拥塞

 

TCP 慢启动限制了一个TCP端点在任意时刻可以传输的分组数。简单来说,每成功接收一个分组,发送端就有了发送另外两个分组的权限。如果某个HTTP事务有大量数据要发送,是不能一次将所有分组都发送出去的。必须发送一个分组,等待确认; 然后可以发送两个分组,每个分组都必须被确认,这样就可以发送四个分组了,以此类推。这种方式被称为"打开拥塞窗口"

 

 

 

3. Nagle算法与 TCP_NODELAY

tcp 有一个数据流接口,应用程序可以通过它将任意尺寸的数据放入TCP栈中,即使一次只放一个字节也可以,但是,每个TCP段中都至少装载了40个字节的标记和首部,所以如果TCP发送了大量包含少量数据的分组,网络性能就会严重下降

   发送大量单字节分组的行为称为“发送端傻窗口综合症”,这种行为效率很低,违返社会道德

   Nagle算法试图在发送一个分组之间,将大量TCP数据绑定在一起,以提高网络效率 RFC 896 对此算法进行了描述

   Nagle算法鼓励发送全迟寸(LAN上最大尺寸的分组大约是1500字节,在因特网上是几百字节)的段。只有当所有其他分组都被确认之后,Nagle算法才允许发送非全尺寸的分组。如果其他分组仍然在传输过程中,就将那部分数据缓存起来,只有当挂起分组被确认,或者缓存中积累了足够发送一个全尺寸分组的数据时,才会将缓存的数据发送出去

 

HTTP应用程序常常会在自己的栈中设置参数TCP_NODELAY,禁用Nagle算法,提高性能。如果要这么做的话,一定要确保会向TCP写入大块的数据,这样就不会产生一堆小分组了

 

 

 

 

4. 一些tcp 常见概念

MTU: Maxitum Transmission Unit 最大传输单元 

      一般是1500bytes
MSS: Maxitum Segment Size 最大分段大小 ,TCP段中的最大数据区的大小

     ipv4中最大是1460

 

这个最大传输单元(MTU)实际上和链路层协议有着密切的关系,EthernetII帧的结构DMAC+SMAC+Type+Data+CRC由于以太网传输电气方面的限制,每个以太网帧都有最小的大小64bytes最大不能超过1518bytes

 

 

TCP 中有一个OPTIONS这里有一个MSS的选项。建立tcp连接的两端在三次握手时会协商tcp mss大小,具体如下:pc1 发出syn报文,其中option选项填充的mss字段一般为1460,同样www server收到syn报文后,会发送synack报文应答,option选项填充的mss字段也为1460;协商双方会比较synsyn+ack报文中mss字段大小,选择较小的mss作为发送tcp分片的大小。通过比较,此次协商双方的tcp mss都是1460

 

 

MSSTCP协议在实现的时候往往用MTU值代替(需要减去IP数据包包头的大小20BytesTCP数据段的包头20Bytes)所以往往MSS1460。通讯双方会根据双方提供的MSS值得最小值确定为这次连接的最大MSS值。这是在IPV4的协议中,而在IPV6协议中一般情况下MSS的值为1440,这是因为,IPv6中的IP头的大小是40bytes,比IPV4的大20bytes.

 

TCP流是分段的,由ip分组传送(或 IP 数据报),tcp的数据是通过名为ip分组(ip数据报)的小数据块来发送的。

 

一个IP分组首部(通常为20字节)

一个TCP段首部(通常为20字节)

一个TCP数据块(0个或多个字节)

 

 

TCP 连接是通过4个值来识别的

 

 

 

 

 

 

 

 

 

 

封包序号(Sequence Number4字节

由于 TCP 封包必须要带入 IP 封包当中,所以如果 TCP 数据太大时(大于 IP 封包的容许程度), 就得要进行分段。这个 Sequence Number 就是记录每个封包的序号,可以让收受端重新将 TCP 的数据组合起来。

序号字段的值则指的是本报文段所发送的数据的第一个字节的序号。

 

确认号(Acknowledge Number4字节

为了确认主机端确实有收到我们 client 端所送出的封包数据,我们 client 端当然希望能够收到主机方面的响应,那就是这个 Acknowledge Number 的用途了。 当 client 端收到这个确认码时,就能够确定之前传递的封包已经被正确的收下了。这个号是期望收到对方的下一个报文段的数据的第一个字节的序号

 

数据偏移(Data Offset4比特

4比特。给出头部占32比特的数目。没有任何选项字段的TCP头部长度为20字节;最多可以有60字节的TCP头部。

 

保留字段(Reserved) 占6比特

保留为今后使用,但目前应置为0

 

状态控制码(CodeControl Flag

标志位字段(UAPRSF):占6比特。各 比特的含义如下:

URG:紧急比特(urgent,URG1时,表明紧急指针字段有效,代表该封包为紧急封包。它告诉系统此报文段中有紧急数据,应尽快传送(相当于高优先级的数据), 且上图中的 Urgent Pointer 字段也会被启用。

ACK:确认比特(Acknowledge)。只有当ACK1时确认号字段才有效,代表这个封包为确认封包。当ACK0时,确认号无效。

PSH:(Push function)若为1时,代表要求对方立即传送缓冲区内的其他对应封包,而无需等缓冲满了才送。

RST:复位比特(Reset) ,RST1时,表明TCP连接中出现严重差错(如由于主机崩溃或其他原因),必须释放连接,然后再重新建立运输连接。

SYN:同步比特(Synchronous)SYN置为1,就表示这是一个连接请求或连接接受报文,通常带有 SYN 标志的封包表示『主动』要连接到对方的意思。。

FIN:终止比特(Final),用来释放一个连接。当FIN1时,表明此报文段的发送端的数据已发送完毕,并要求释放运输连接。

 

滑动窗口(Window) 占2字节

窗口字段用来控制对方发送的数据量,可以告知对方目前本身有的缓冲器容量(Receive Buffer) 还可以接收封包。当 Window=0 时,代表缓冲器已经额满,所以应该要暂停传输数据。单位为字节。TCP连接的一端根据设置的缓存空间大小确定自己的接收窗口大小,然后通知对方以确定对方的发送窗口的上限,这个值是本机期望一次接收的字节数

 

 

16位窗口大小:TCP流量控制,字节数,起始于确认序列号指明的值,接收端期望收到的字节,最大为65535.

16位检验和:包括计算TCP首部和数据综合的二进制反码和检验和。

 

紧急指针(Urgent Pointer)  2字节

这个字段是在 Code 字段内的 URG = 1 时才会产生作用。可以告知紧急数据所在的位置(紧急指针指出在本报文段中的紧急数据的最后一个字节的序号)

 

选项(Options) 长度可变。

TCP首部可以有多达40字节的可选信息,用于把附加信息传递给终点,或用来对齐其它选项。目前此字段仅应用于表示接收端可以接收的最大数据区段容量,若此字段不使用, 表示可以使用任意数据区段的大小。 这个字段较少使用。

TCP首部的主要选项:

最大报文段长度MSS(Maximum Segment Size)TCP报文段中的数据字段的最大长度。MSS告诉对方TCP:“我的缓存所能接收的报文段的数据字段的最大长度是MSS个字节。”

 

填充字段(Padding)

如同 IP 封包需要有固定的 32bits 表头一样, Options 由于字段为非固定, 所以也需要 Padding 字段来加以补齐才行。同样也是 32 bits 的整数。

这是为了使整个首部长度是4字节的整数倍。

 

5. TCP 拆包 粘包

 

一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP 拆包和粘包问题

 

服务端一次接收到了两个数据包,D1D2是粘合在一起,称为TCP粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理

服务端分两次读取到了两个数据包,第一次读取到了完整的D1包,和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包

 

 

 

 

 

Java 中的Socket Options 可设置的相关值

SO_SNDBUF:    值是integer,代表send buffer的大小,以字节为单位

               默认这个值是OS dependent,可以在socket boundconnected

               之前被设置, OS的不同,可以在socket bound之后改变.

               不允许设置为负值

 

SO_RCVBUF:     值是个integer,代表socket receive buffer有多个字节,

                默认这个值是OS dependent,可以在socket boundconnected

                之前被设置, OS的不同,可以在socket bound之后改变.

                不允许设置为负值

 

 

 

粘包、拆包发生原因

 

1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。

 

2、待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

 

3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

 

4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

 

 

TCP粘包和拆包的解决策略

 

1. 消息定长。例如100字节,不够的可以通过补0填充。

2. 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议,可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开

3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度)

4. 其它复杂的协议,如RTMP协议等。

 

 

 

5.1 处理固定长度

 

public static final int MAX_LENGTH = 32;

 

public FixLengthWrapper(String msg) {

ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); byteBuffer.put(msg.getBytes());

byte[] fillData = new byte[MAX_LENGTH - msg.length()];

byteBuffer.put(fillData);

 data = byteBuffer.array();

}

public FixLengthWrapper(byte[] msg) {

ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); byteBuffer.put(msg);

byte[] fillData = new byte[MAX_LENGTH - msg.length];

byteBuffer.put(fillData);

 data = byteBuffer.array();

}

 

可以看到客户端在发送数据前首先把数据封装为长度为32bytes的数据包,这个长度是根据目前实际数据包长度来规定的,这个长度必须要大于所有可能出现的数据包的长度,这样才不会出现把数据“截断”的情况

 

private static void processByFixLength(SocketChannel socketChannel) throws IOException {

        while (socketChannel.read(byteBuffer) > 0) {

                byteBuffer.flip();

               while (byteBuffer.remaining() >= FixLengthWrapper.MAX_LENGTH) {

                       byte[] data = new byte[FixLengthWrapper.MAX_LENGTH];

                       byteBuffer.get(data, 0, FixLengthWrapper.MAX_LENGTH);

                        System.out.println(new String(data) + " <---> " + number++);

                }

                byteBuffer.compact();

          }

}

 

5.2 添加长度首部

这种方式的处理较上面提到的方式稍微复杂一点。在发送端需要给待发送的数据添加固定的首部,然后再发送出去,然后在接收端需要根据这个首部的长度信息进行数据包的组合或拆分,发送端程序如下:

 

 

//发送端

String msg = "hello world " + number++;  

//add the head represent the data length

socketChannel.write(ByteBuffer.wrap(new PacketWrapper(msg).getBytes()));

 

//添加长度首部的工具类

public class PacketWrapper {

 

 private int length;

 private byte[] payload;

 

 public PacketWrapper(String payload) {

     this.payload = payload.getBytes();

     this.length = this.payload.length;

 }

 

 public PacketWrapper(byte[] payload) {

     this.payload = payload;

     this.length = this.payload.length;

 }

 

 public byte[] getBytes() {

     ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);

     byteBuffer.putInt(this.length);

     byteBuffer.put(payload);

     return byteBuffer.array();

 }

 

 public String toString() {

     StringBuilder sb = new StringBuilder();

     for (byte b : getBytes()) {

         sb.append(String.format("0x%02X ", b));

     }

     return sb.toString();

 }

}

 

 

 

从程序可以看到,发送端在发送数据前首先给待发送数据添加了代表长度的首部,首部长为4bytes(即int型长度),这样接收端在收到这个数据之后,首先需要读取首部,拿到实际数据长度,然后再继续读取实际长度的数据,即实现了组包和拆包的操作。程序如下:

 

 

 

private static void processByHead(SocketChannel socketChannel) throws IOException {

 

    while (socketChannel.read(byteBuffer) > 0) {

        // 保存bytebuffer状态

        int position = byteBuffer.position();

        int limit = byteBuffer.limit();

        byteBuffer.flip();

        // 判断数据长度是否够首部长度

        if (byteBuffer.remaining() < 4) {

            byteBuffer.position(position);

            byteBuffer.limit(limit);

            continue;

        }

        // 判断bytebuffer中剩余数据是否足够一个包

        int length = byteBuffer.getInt();

        if (byteBuffer.remaining() < length) {

            byteBuffer.position(position);

            byteBuffer.limit(limit);

            continue;

        }

        // 拿到实际数据包

        byte[] data = new byte[length];

 

        byteBuffer.get(data, 0, length);

        System.out.println(new String(data) + " <---> " + number++);

        byteBuffer.compact();

    }

}

 

 

这里需要提醒各位同学一个问题,由于我在测试的时候采用的是一台机器连续发送数据来模拟高并发的场景,所以在测试的时候会发现服务器端收到的数据包的个数经常会小于包的序号,好像发生了丢包。但经过仔细分析可以发现,这种情况是因为TCP发送缓存溢出导致的丢包,也就是这个数据包根本没有发出来。也就是说,发送端发送数据过快,导致接收端缓存很快被填满,这个时候接收端会把通知窗口设置为0从而控制发送端的流量,这样新到的数据只能暂存在发送端的发送缓存中,当发送缓存溢出后,就出现了我上面提到的丢包,这个问题可以通过增大发送端缓存来缓解这个问题,

 

socketChannel.socket().setSendBufferSize(102400);  

当然这个话题不在本文的讨论范围,如果有兴趣的同学可以参阅《TCP/IP详解卷一》中的拥塞窗口一章

 

关于源码说明,源码默认是把粘包和拆包处理这一部分注释掉了,分别位于NIOTcpServerNIOTcpClient文件中,需要测试粘包和拆包处理程序的同学需要把这一段注释给去掉。

 

 

 

5.3 利用LineBasedFrameDecoder 解决Tcp 粘包问题

 

package com.phei.netty.frame.correct;

 

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LineBasedFrameDecoder;

import io.netty.handler.codec.string.StringDecoder;

 

/**

 * @author lilinfeng

 * @date 2014214

 * @version 1.0

 */

public class TimeServer {

 

    public void bind(int port) throws Exception {

// 配置服务端的NIO线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

    ServerBootstrap b = new ServerBootstrap();

    b.group(bossGroup, workerGroup)

    .channel(NioServerSocketChannel.class)

    .option(ChannelOption.SO_BACKLOG, 1024)

    .childHandler(new ChildChannelHandler());

    // 绑定端口,同步等待成功

    ChannelFuture f = b.bind(port).sync();

 

    // 等待服务端监听端口关闭

    f.channel().closeFuture().sync();

} finally {

    // 优雅退出,释放线程池资源

    bossGroup.shutdownGracefully();

    workerGroup.shutdownGracefully();

}

    }

 

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel arg0) throws Exception {

    arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));

    arg0.pipeline().addLast(new StringDecoder());

    arg0.pipeline().addLast(new TimeServerHandler());

}

    }

 

    /**

     * @param args

     * @throws Exception

     */

    public static void main(String[] args) throws Exception {

int port = 8080;

if (args != null && args.length > 0) {

    try {

port = Integer.valueOf(args[0]);

    } catch (NumberFormatException e) {

// 采用默认值

    }

}

new TimeServer().bind(port);

    }

}

 

 

public class TimeServerHandler extends ChannelHandlerAdapter {

 

    private int counter;

 

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg)

    throws Exception {

String body = (String) msg;

System.out.println("The time server receive order : " + body

+ " ; the counter is : " + ++counter);

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(

System.currentTimeMillis()).toString() : "BAD ORDER";

currentTime = currentTime + System.getProperty("line.separator");

ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

ctx.writeAndFlush(resp);

    }

 

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

ctx.close();

    }

}

 

 

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

 

import java.util.logging.Logger;

 

/**

 * @author lilinfeng

 * @date 2014214

 * @version 1.0

 */

public class TimeClientHandler extends ChannelHandlerAdapter {

 

    private static final Logger logger = Logger

    .getLogger(TimeClientHandler.class.getName());

 

    private int counter;

 

    private byte[] req;

 

    /**

     * Creates a client-side handler.

     */

    public TimeClientHandler() {

req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))

.getBytes();

    }

 

    @Override

    public void channelActive(ChannelHandlerContext ctx) {

ByteBuf message = null;

for (int i = 0; i < 100; i++) {

    message = Unpooled.buffer(req.length);

    message.writeBytes(req);

    ctx.writeAndFlush(message);

}

    }

 

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg)

    throws Exception {

String body = (String) msg;

System.out.println("Now is : " + body + " ; the counter is : "

+ ++counter);

    }

 

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 释放资源

logger.warning("Unexpected exception from downstream : "

+ cause.getMessage());

ctx.close();

    }

}

 

 

public class TimeClient {

 

    public void connect(int port, String host) throws Exception {

// 配置客户端NIO线程组

EventLoopGroup group = new NioEventLoopGroup();

try {

    Bootstrap b = new Bootstrap();

    b.group(group).channel(NioSocketChannel.class)

    .option(ChannelOption.TCP_NODELAY, true)

    .handler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch)

throws Exception {

    ch.pipeline().addLast(

    new LineBasedFrameDecoder(1024));

    ch.pipeline().addLast(new StringDecoder());

    ch.pipeline().addLast(new TimeClientHandler());

}

    });

 

    // 发起异步连接操作

    ChannelFuture f = b.connect(host, port).sync();

 

    // 当代客户端链路关闭

    f.channel().closeFuture().sync();

} finally {

    // 优雅退出,释放NIO线程组

    group.shutdownGracefully();

}

    }

 

    /**

     * @param args

     * @throws Exception

     */

    public static void main(String[] args) throws Exception {

int port = 8080;

if (args != null && args.length > 0) {

    try {

port = Integer.valueOf(args[0]);

    } catch (NumberFormatException e) {

// 采用默认值

    }

}

new TimeClient().connect(port, "127.0.0.1");

    }

}

 

LineBasedFrameDecoder StringDecoder

 

LineBasedFrameDecoder 的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有"\n" "\r\n" ,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。 它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流

 

StringDecoder 的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Hander.

 

LineBasedFrameDecoder +StringDecoder 的组合就是按行切换的文本解码器,可以用来解决tcp的粘包和拆包

 

 

5.4 分隔符解码器DelimiterBasedFrameDecoder

 略

 

5.5 定长解码器 FixedLengthFrameDecoder

 略

6. 参考文章

Netty权威指南 第2

 

 

 

 

因篇幅问题不能全部显示,请点此查看更多更全内容

Top