当前位置 : 主页 > 网络编程 > net编程 >

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#

来源:互联网 收集:自由互联 发布时间:2023-10-08
十年河东,十年河西,莫欺少年穷 学无止境,精益求精 接着上篇博客:​​Tcp/ip及Udp编程之Socket.Core 开源框架---如何理解TCP粘包原理及解决方案​​ Socket.Core框架中,Pack模式是push模式

十年河东,十年河西,莫欺少年穷

学无止境,精益求精

接着上篇博客:​​Tcp/ip及Udp编程之Socket.Core 开源框架---如何理解TCP粘包原理及解决方案​​

Socket.Core框架中,Pack模式是push模式和pull模式的结合体,其处理分包的思路是:pack客户端及pack服务端在发送报文时,自动为数据报文增加4字节标题头,然后通过标题头及报文长度进行切包处理,但该框架在切包时,不会接收所有的完整包,因此,有必要自行扩展方法进行完善。

其服务端源码如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_服务端简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_02

using socket.core.Common;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace socket.core.Server
{
/// <summary>
/// 推和拉组合体,自带分包处理机制
/// </summary>
public class TcpPackServer
{
/// <summary>
/// 基础类
/// </summary>
private TcpServer tcpServer;
/// <summary>
/// 连接成功事件 item1:connectId
/// </summary>
public event Action<int> OnAccept;
/// <summary>
/// 接收通知事件 item1:connectId,item2:数据
/// </summary>
public event Action<int, byte[]> OnReceive;
/// <summary>
/// 发送通知事件 item1:connectId,item2:长度
/// </summary>
public event Action<int, int> OnSend;
/// <summary>
/// 断开连接通知事件 item1:connectId,
/// </summary>
public event Action<int> OnClose;
/// <summary>
/// 接收到的数据缓存
/// </summary>
private ConcurrentDictionary<int, List<byte>> queue;
/// <summary>
/// 包头标记
/// </summary>
private uint headerFlag;

/// <summary>
/// 设置基本配置
/// </summary>
/// <param name="numConnections">同时处理的最大连接数</param>
/// <param name="receiveBufferSize">用于每个套接字I/O操作的缓冲区大小(接收端)</param>
/// <param name="overtime">超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时</param>
/// <param name="headerFlag">包头标记范围0~1023(0x3FF),当包头标识等于0时,不校验包头</param>
public TcpPackServer(int numConnections, int receiveBufferSize, int overtime, uint headerFlag)
{
if (headerFlag < 0 || headerFlag > 1023)
{
headerFlag = 0;
}
this.headerFlag = headerFlag;
Thread thread = new Thread(new ThreadStart(() =>
{
queue = new ConcurrentDictionary<int, List<byte>>();
tcpServer = new TcpServer(numConnections, receiveBufferSize, overtime);
tcpServer.OnAccept += TcpServer_eventactionAccept;
tcpServer.OnReceive += TcpServer_eventactionReceive;
tcpServer.OnSend += TcpServer_OnSend;
tcpServer.OnClose += TcpServer_eventClose;
}));
thread.IsBackground = true;
thread.Start();
}

/// <summary>
/// 开启监听服务
/// </summary>
/// <param name="port">监听端口</param>
public void Start(int port)
{
while (tcpServer == null)
{
Thread.Sleep(10);
}
tcpServer.Start(port);
}

/// <summary>
/// 连接成功事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
private void TcpServer_eventactionAccept(int connectId)
{
if (OnAccept != null)
OnAccept(connectId);
}

/// <summary>
/// 发送数据
/// </summary>
/// <param name="connectId">连接ID</param>
/// <param name="data">数据</param>
/// <param name="offset">偏移位</param>
/// <param name="length">长度</param>
public void Send(int connectId, byte[] data, int offset, int length)
{
//增加四字节标题后
data = AddHead(data.Skip(offset).Take(length).ToArray());
tcpServer.Send(connectId, data, 0, data.Length);
}

/// <summary>
/// 发送成功事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
/// <param name="length">长度</param>
private void TcpServer_OnSend(int connectId, int length)
{
if (OnSend != null)
{
OnSend(connectId, length);
}
}

/// <summary>
/// 接收通知事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
/// <param name="data">数据</param>
/// <param name="offset">偏移位</param>
/// <param name="length">长度</param>
private void TcpServer_eventactionReceive(int connectId, byte[] data, int offset, int length)
{
if (OnReceive != null)
{
if (!queue.ContainsKey(connectId))
{
queue.TryAdd(connectId, new List<byte>());
}
byte[] r = new byte[length];
Buffer.BlockCopy(data, offset, r, 0, length);
queue[connectId].AddRange(r);
byte[] datas = Read(connectId);
if (datas != null && datas.Length > 0)
{
OnReceive(connectId, datas);
}

}
}

/// <summary>
/// 断开连接
/// </summary>
/// <param name="connectId">连接ID</param>
public void Close(int connectId)
{
tcpServer.Close(connectId);
}

/// <summary>
/// 断开连接通知事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
private void TcpServer_eventClose(int connectId)
{
if (queue.ContainsKey(connectId))
{
List<byte> lst = new List<byte>();
queue.TryRemove(connectId,out lst);
}
if (OnClose != null)
OnClose(connectId);
}

/// <summary>
/// 在数据起始位置增加4字节包头
/// </summary>
/// <param name="data">数据</param>
/// <returns></returns>
private byte[] AddHead(byte[] data)
{
uint len = (uint)data.Length;
uint header = (headerFlag << 22) | len;
byte[] head = System.BitConverter.GetBytes(header);
return head.Concat(data).ToArray();
}

/// <summary>
/// 读取数据--针对沾包的数据进行切包,但该框架不能将缓存中的数据全部处理【需要自己增加方法处理】
/// 如果收到的数据为一个数据包的一半,该框架通过长度验证后,不会执行接收操作,只有下一次传输另一半时,才会处理这个完整的包。
/// </summary>
/// <param name="connectId">连接标记537529691</param>
/// <returns></returns>
private byte[] Read(int connectId)
{
if (!queue.ContainsKey(connectId))
{
return null;
}
List<byte> data = queue[connectId];
uint header = BitConverter.ToUInt32(data.ToArray(), 0);
if (headerFlag != (header >> 22))
{
return null;
}
uint len = header & 0x3fffff;
if (len > data.Count - 4)
{
return null;
}
//切包
byte[] f = data.Skip(4).Take((int)len).ToArray();
queue[connectId].RemoveRange(0, (int)len + 4);
return f;
}

/// <summary>
/// 给连接对象设置附加数据
/// </summary>
/// <param name="connectId">连接标识</param>
/// <param name="data">附加数据</param>
/// <returns>true:设置成功,false:设置失败</returns>
public bool SetAttached(int connectId, object data)
{
return tcpServer.SetAttached(connectId, data);
}

/// <summary>
/// 获取连接对象的附加数据
/// </summary>
/// <param name="connectId">连接标识</param>
/// <returns>返回附加数据</returns>
public T GetAttached<T>(int connectId)
{
return tcpServer.GetAttached<T>(connectId);
}
}
}

View Code

另外,在真实的使用场景中,客户端开发人员及服务端开发人员都会协商好报文结构,报文结构中很可能已经存在报文的标题头,因此,此框架自行增加标题头的方式不是那么友好。

综上所述:无论是切包后,接收端缓存区存在未被处理的报文。还是,框架自行增加报文标题头,都属于框架的坑部分,要避免这些问题,就必须自行扩展/修改源码。

在此,结合个人的实际情况,进行剖析,如下:

假设存在如下报文结构

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_03

 现在我们模拟下沾包的数据报文,假设一个完整的数据报文为:

List<byte> lst_3 = new List<byte> { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x05, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 };

如果客户端发送了这样一个数据报文,就出现了沾包的情况,如下:

private void BtnSend_Click(object sender, EventArgs e)
{
List<byte> lst = new List<byte>() { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x01, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x01, 0x02 };

List<byte> lst_2 = new List<byte> { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x03, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 };

List<byte> lst_3 = new List<byte> { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x05, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 };

List<byte> lst_4 = new List<byte> { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x07, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 };
var allList = new List<byte>();
allList.AddRange(lst);
allList.AddRange(lst_2);
allList.AddRange(lst_3);
allList.AddRange(lst_4);
var array = allList.ToArray();
pck.Send(array, 0, array.Length);
}

四个完整数据包一起发送。也就是沾包,如果在不修改源码的情况下,会有如下输出

 简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_04


 实际发了四条报文,但服务端只打印了一个报文,其他的三个报文存储在了服务端缓存区。这显然是错的

我们以上报报文结构为例,针对源码修改如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_服务端简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_02

using socket.core.Common;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace socket.core.Server
{
/// <summary>
/// 推和拉组合体,自带分包处理机制
/// </summary>
public class TcpPackServerService
{
/// <summary>
/// 基础类
/// </summary>
private TcpServer tcpServer;
/// <summary>
/// 连接成功事件 item1:connectId
/// </summary>
public event Action<int> OnAccept;
/// <summary>
/// 接收通知事件 item1:connectId,item2:数据
/// </summary>
public event Action<int, byte[]> OnReceive;
/// <summary>
/// 发送通知事件 item1:connectId,item2:长度
/// </summary>
public event Action<int, int> OnSend;
/// <summary>
/// 断开连接通知事件 item1:connectId,
/// </summary>
public event Action<int> OnClose;
/// <summary>
/// 接收到的数据缓存
/// </summary>
private ConcurrentDictionary<int, List<byte>> queue;
/// <summary>
/// 包头标记
/// </summary>
private uint headerFlag;

/// <summary>
/// 设置基本配置
/// </summary>
/// <param name="numConnections">同时处理的最大连接数</param>
/// <param name="receiveBufferSize">用于每个套接字I/O操作的缓冲区大小(接收端)</param>
/// <param name="overtime">超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时</param>
/// <param name="headerFlag">包头标记范围0~1023(0x3FF),当包头标识等于0时,不校验包头</param>
public TcpPackServerService(int numConnections, int receiveBufferSize, int overtime, uint headerFlag)
{
this.headerFlag = headerFlag;
Thread thread = new Thread(new ThreadStart(() =>
{
queue = new ConcurrentDictionary<int, List<byte>>();
tcpServer = new TcpServer(numConnections, receiveBufferSize, overtime);
tcpServer.OnAccept += TcpServer_eventactionAccept;
tcpServer.OnReceive += TcpServer_eventactionReceive;
tcpServer.OnSend += TcpServer_OnSend;
tcpServer.OnClose += TcpServer_eventClose;
}));
thread.IsBackground = true;
thread.Start();
}

/// <summary>
/// 开启监听服务
/// </summary>
/// <param name="port">监听端口</param>
public void Start(int port)
{
while (tcpServer == null)
{
Thread.Sleep(10);
}
tcpServer.Start(port);
}

/// <summary>
/// 连接成功事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
private void TcpServer_eventactionAccept(int connectId)
{
if (OnAccept != null)
OnAccept(connectId);
}

/// <summary>
/// 发送数据
/// </summary>
/// <param name="connectId">连接ID</param>
/// <param name="data">数据</param>
/// <param name="offset">偏移位</param>
/// <param name="length">长度</param>
public void Send(int connectId, byte[] data, int offset, int length)
{
// data = AddHead(data.Skip(offset).Take(length).ToArray());
tcpServer.Send(connectId, data, 0, data.Length);
}

/// <summary>
/// 发送成功事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
/// <param name="length">长度</param>
private void TcpServer_OnSend(int connectId, int length)
{
if (OnSend != null)
{
OnSend(connectId, length);
}
}

/// <summary>
/// 接收通知事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
/// <param name="data">数据</param>
/// <param name="offset">偏移位</param>
/// <param name="length">长度</param>
private void TcpServer_eventactionReceive(int connectId, byte[] data, int offset, int length)
{
if (OnReceive != null)
{
if (!queue.ContainsKey(connectId))
{
queue.TryAdd(connectId, new List<byte>());
}
byte[] r = new byte[length];
Buffer.BlockCopy(data, offset, r, 0, length);
queue[connectId].AddRange(r);
byte[] datas = Read(connectId);

if (datas != null && datas.Length > 0)
{
OnReceive(connectId, datas);
//处理沾包报文【缓存区不为空的问题】
TcpNaglePack(connectId, queue[connectId]);
}

}
}

/// <summary>
/// 自定义处理沾包【此框架沾包会引起服务端缓存区无限增大】-清除缓存区数据
/// </summary>
/// <param name="connectId"></param>
/// <param name="datas"></param>
private void TcpNaglePack(int connectId,List<byte> datas)
{
if (datas.Count > 0)
{
byte[] redDatas = Read(connectId);
if (redDatas != null && redDatas.Length > 0)
{
OnReceive(connectId, redDatas);
if (queue[connectId] != null && queue[connectId].Count > 0)
{
TcpNaglePack(connectId, queue[connectId]);
Thread.Sleep(100);
}
}
}

}

/// <summary>
/// 断开连接
/// </summary>
/// <param name="connectId">连接ID</param>
public void Close(int connectId)
{
tcpServer.Close(connectId);
}

/// <summary>
/// 断开连接通知事件方法
/// </summary>
/// <param name="connectId">连接标记</param>
private void TcpServer_eventClose(int connectId)
{
if (queue.ContainsKey(connectId))
{
List<byte> lst = new List<byte>();
queue.TryRemove(connectId, out lst);
}
if (OnClose != null)
OnClose(connectId);
}

/// <summary>
/// 在数据起始位置增加4字节包头
/// </summary>
/// <param name="data">数据</param>
/// <returns></returns>
//private byte[] AddHead(byte[] data)
//{
// uint len = (uint)data.Length;
// uint header = (headerFlag << 22) | len;
// byte[] head = System.BitConverter.GetBytes(header);
// return head.Concat(data).ToArray();
//}
private int GetValue(List<byte> Ary)
{
double result = 0;
var cm = 0;
cm = Ary.Count;
foreach (var item in Ary)
{
if (cm - 1 == 0)
{
result += item;
}
else
{
result += item * Math.Pow(16, (cm * 2 - 2));
cm--;
}

}
return Convert.ToInt32(result);
}


/// <summary>
/// 读取数据--切包处理
/// </summary>
/// <param name="connectId">连接标记537529691</param>
/// <returns></returns>
private byte[] Read(int connectId)
{
if (!queue.ContainsKey(connectId))
{
return null;
}
List<byte> data = queue[connectId];
uint header = BitConverter.ToUInt16(data.ToArray(), 0);
if (headerFlag != header)
{
return null;
}
var lensource = data.Skip(4).Take(2).ToArray();
string len1 = BitConverter.ToString(lensource).Replace("-","");
int len = Convert.ToInt32(len1,16);
if (len > data.Count - 8)//数据长度校验
{
return null;
}
byte[] f = data.Take((int)len + 8).ToArray();
queue[connectId].RemoveRange(0, (int)len + 8);
return f;
}

/// <summary>
/// 给连接对象设置附加数据
/// </summary>
/// <param name="connectId">连接标识</param>
/// <param name="data">附加数据</param>
/// <returns>true:设置成功,false:设置失败</returns>
public bool SetAttached(int connectId, object data)
{
return tcpServer.SetAttached(connectId, data);
}

/// <summary>
/// 获取连接对象的附加数据
/// </summary>
/// <param name="connectId">连接标识</param>
/// <returns>返回附加数据</returns>
public T GetAttached<T>(int connectId)
{
return tcpServer.GetAttached<T>(connectId);
}
}
}

View Code

服务器端初始如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_服务端简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_02

class Program
{
static int port = int.Parse(ConfigurationManager.AppSettings["port"]);
static int numConnections = int.Parse(ConfigurationManager.AppSettings["numConnections"]);
static int receiveBufferSize = int.Parse(ConfigurationManager.AppSettings["receiveBufferSize"]);
static int overtime = int.Parse(ConfigurationManager.AppSettings["overtime"]);
static void Main(string[] args)
{
try
{
Pack pack = new Pack(numConnections, receiveBufferSize, overtime, port,31097);
Console.WriteLine("TcpIP.服务启动");
}
catch (Exception ex)
{
Console.WriteLine("TcpIP.服务异常" + ex.ToString());

}

Console.ReadKey();
}
}

View Code

经过对源码的修改,服务端在处理报文时,不再增加四字节标题头,不再进行四字节标题头的验证,仅针对自己报文的标题头【0x79,0x79】进行验证,如下:

private byte[] Read(int connectId)
{
if (!queue.ContainsKey(connectId))
{
return null;
}
List<byte> data = queue[connectId];
//取两字节标题头
uint header = BitConverter.ToUInt16(data.ToArray(), 0);
if (headerFlag != header)
{
return null;
}
var lensource = data.Skip(4).Take(2).ToArray();
string len1 = BitConverter.ToString(lensource).Replace("-","");
int len = Convert.ToInt32(len1,16);
//数据长度校验,根据报文结构,数据长度等于【从功能码开始到数据部分结束的长度,不包含校验码】,因此,除掉包头(2字节),功能码(2字节),帧序号(2字节),校验码(2字节),报文中的长度等于数据报文真实长度减去8
if (len > data.Count - 8)//数据长度校验
{
return null;
}
byte[] f = data.Take((int)len + 8).ToArray();
queue[connectId].RemoveRange(0, (int)len + 8);
return f;
}

最后,针对沾包处理,做如下递归切包

/// <summary>
/// 自定义处理沾包【此框架沾包会引起服务端缓存区无限增大】-清除缓存区数据
/// </summary>
/// <param name="connectId"></param>
/// <param name="datas"></param>
private void TcpNaglePack(int connectId,List<byte> datas)
{
if (datas.Count > 0)
{
byte[] redDatas = Read(connectId);
if (redDatas != null && redDatas.Length > 0)
{
OnReceive(connectId, redDatas);
if (queue[connectId] != null && queue[connectId].Count > 0)
{
TcpNaglePack(connectId, queue[connectId]);
Thread.Sleep(100);
}
}
}

}

这样,我们一次性发送四个报文时,数据是可以正常显示的,如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_2d_09


 另外,我们还可以这样测试【4组报文合起来】,如下:

private void BtnSend_Click(object sender, EventArgs e)
{
//4
List<byte> lst = new List<byte>() { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x01, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x01, 0x02, 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x03, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06, 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x05, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 , 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x07, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x2d, 0x05, 0x06 };


var allList = new List<byte>();
allList.AddRange(lst);

var array = allList.ToArray();
pck.Send(array, 0, array.Length);
}

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_数据_10


 输出的结果也是正确的。

另外,我们把一个包分成两段,分两次发送,如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_数据_11

 如下:

private void BtnSend_Click(object sender, EventArgs e)--第一次发送
{
//4
List<byte> lst = new List<byte>() { 0x79, 0x79, 0x01, 0x01, 0x00, 0x0d, 0x00, 0x01, 0x88, 0x77, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff };


var allList = new List<byte>();
allList.AddRange(lst);

var array = allList.ToArray();
pck.Send(array, 0, array.Length);
}

private void button1_Click(object sender, EventArgs e)--第二次发送
{
List<byte> lst = new List<byte>() { 0x00, 0x01, 0x2d, 0x01, 0x02 };
pck.Send(lst.ToArray(), 0, lst.Count);
}

 当第一次发送时,由于数据包长度验证未通过,因此服务端不会打印报文信息,这时,第一次发送的报文会存储在服务端数据缓存中,。

当第二次发送时,由于缓存区数据增加了第二次发送的内容,经长度及标题头验证后,发现是一个完整的包,因此,会将完整数据包打印在服务端,如下:

简单谈谈tcp/ip/udp开源框架Scoket.Core的Pack模式遇到的坑及解决方案 C#_数据_12


 以上便是本文的内容,有点生涩,不喜勿喷

@天才卧龙的博客





上一篇:基于Net底层的邮件发送
下一篇:没有了
网友评论