- SuperSocket 2.0从入门到懵逼
- 1 使用SuperSocket 2.0在AspNetCore项目中搭建一个Socket服务器
- 1.1 引入SuperSocket 2.0
- 1.2 在AspNetCore中搭建一个Socket服务器
- 2 基本的协议概念
- 2.1 基本协议种类
- 2.1.1 固定头格式协议
- 2.1.2 固定头尾标识协议
- 2.1.3 固定包大小协议
- 2.1.4 命令行协议
- 2.1.5 一些其它协议
- PS: 关于协议的一些硬件厂商的私有协议比较奇葩, 他们的协议五花八门的...不过我们这里不做阐述, 有时间我会再讲
- 2.1 基本协议种类
- 3 SuperSocket中的几个基本概念
- 3.1 Package Type
- 3.2 PipelineFilter Type
- 3.3 使用PackageType和PipelineFilter Type创建SuperSocket
- 4 SuperSocket中的PipelineFilter, 实现自己的PipelineFilter
- 4.1 内置的PipelineFilter模板
- 4.2 基于内置模板实现PipelineFilter
- 4.2.1 FixedHeaderPipelineFilter-头部格式固定并且包含内容长度的协议
- 4.2.3 另一种挂载解析器的方式
- 7 扩展AppSession和SuperSocketService
- 7.1 扩展AppSession
- 7.2 如何自己实现SuperSocketService?
- 8 扩展SuperSocket的功能
- 8.1 多协议切换
- 9 搭建WebSocket服务器
- 9.1 番外: WebSocket传参
- 10 多服务器以及不同服务间的协同
- More 1 协议的编解码器开发预览
- More 2 DotNetty
- 1 使用SuperSocket 2.0在AspNetCore项目中搭建一个Socket服务器
1 使用SuperSocket 2.0在AspNetCore项目中搭建一个Socket服务器 1.1 引入SuperSocket 2.0附带实现, 讲解, 与部分源代码解读
SuperSocket 2.0目前仍然处于Beta阶段, 但是功能基本可靠, 可以用于生产环境.
可以从Github源地址自行fork代码进行其它修改, 如需直接引用, 在默认的Nuget服务器是没有的, 需要添加作者自己的Nuget服务器https://www.myget.org/F/supersocket/api/v3/index.json
, 获取预览版本.
使用SuperSocket 2.0快速搭建一个Socket服务器非常简单, 在框架中, 作者实现了一个SuperSocketHostBuilder
进行构建, 我们要做的只是简单的配置一些参数和业务逻辑.
此外, SuperSocketHostBuilder
是可以直接嵌入到AspNetCore项目的CreateHostBuilder
上的, 但是并不推荐这么做...
- 配置数据包和过滤器/选择器
- 数据包, 即我们进行Socket通信时的数据包, 可以是包对象, 也可以是
byte[]
, 如果是包对象, 则需要在过滤器中配置解码器. - 过滤器, 作用是对数据包进行筛选然后截取一包数据, 可以将解码器挂载进来, 直接将二进制数据映射为对象.
- 选择器, 我们可以使用选择器, 来实现一个端口服务于多种协议的情况, 此时, 一个选择器则会搭配多个过滤器进行使用.
- 数据包, 即我们进行Socket通信时的数据包, 可以是包对象, 也可以是
- 配置IP和端口
- 配置IP和端口可以选择两种方式, 一种方式是从程序写入, 另一种是从配置文件中写入.
- 在本系列的介绍中, 我们大多数采用程序写入的方式, 从配置文件写入的方式, 后续会采用其它方式实现, 来扩展业务.
- 配置Session
-
在程序中作者内置了
IAppSession
和AppSession
供我们使用, 如果我们需要自定义Session, 则需要继承AppSession
并且在程序中进行引用, 即可切换为我们定义的Session. -
自定义Session时, 由于在程序中大多数提供的参数都为
IAppSession
, 所以, 需要实现SuperSocket的更多其它接口进行重写, 来维持程序运转. -
自定义Session大多数时候是为了添加更多自定义属性, 作者在设计中提供了另外的方式提供我们选择:
// AppSession源码 public class AppSession : IAppSession, ILogger, ILoggerAccessor { //...... private Dictionary<object, object> _items; public object this[object name] { get { var items = _items; if (items == null) return null; object value; if (items.TryGetValue(name, out value)) return value; return null; } set { lock (this) { var items = _items; if (items == null) items = _items = new Dictionary<object, object>(); items[name] = value; } } } //...... }
可以看到, 其中内置了一个字典, 我们可以将属性的
Key-Value
值直接存在字典中, 即session["prop"] = value;
的形式.
-
- 配置SessionHandler
- SessionHandler会在建立和断开Socket连接时触发, 用于处理连接和断开时的业务, 可以在
SuperSocketHostBuilder
中直接配置, 也可以通过重写相应的接口实现. - 连接时, 会将创建好的Session传入该方法.
- 断开时, 会将断开的Session以及触发断开的事件传入该方法.
- SessionHandler会在建立和断开Socket连接时触发, 用于处理连接和断开时的业务, 可以在
- 配置PackageHandler
- PackageHandler会在接收到数据包时触发.
- 该方法自动触发时我们将获取两个参数, 一个是Session, 一个是Package. 但是要注意的是, 这里的Session是
IAppSession
类型, 并不是我们自定义的Session.- Session即为当前Socket连接, 里面附带了各种连接信息以及状态等.
- Package是通过过滤器后得到的数据包, 不过要注意的是, 例如:如果数据包为头尾标识符的数据包, 如果采用的是
byte[]
的形式, 得到的可能不是原包, 而是去除了包头尾标识后的数据体. 即:7E 01 02 03 7E
, 去掉头尾的7E
标识得到的是01 02 03
.
- Build & Run
- 构建这里同时提供了几种方式, 推荐采用
BuildAsServer()
, 然后通过StartAsync()
进行启用.
- 构建这里同时提供了几种方式, 推荐采用
此时, 一个Socket服务器就搭建完成了. 具体实现:
// SampleSession
public class SampleSession: AppSession
{
}
// Socket Server代码
var tcpHost = SuperSocketHostBuilder.Create<byte[], PipelineFilter>()
.ConfigureSuperSocket(options =>
{
options.AddListener(new ListenOptions
{
Ip = "Any",
Port = 4040
})
.AddListener(new ListenOptions()
{
Ip = "Any",
Port = 8888
});
})
.UseSession<JT808TcpSession>()
.UseClearIdleSession()
.UseSessionHandler(s =>
{
})
.UsePackageHandler(async (s, p) =>
{
//解包/应答/转发
})
.ConfigureErrorHandler((s, v) =>
{
})
.UseMiddleware<InProcSessionContainerMiddleware>()
.UseInProcSessionContainer()
.BuildAsServer();
await tcpHost.RunAsync();
.UseClearIdleSession()
请务必调用, 在使用各类Socket框架时, 不可避免的我们的应用程序都会维持大量的僵尸连接, SuperSocket中提供了UseClearIdleSession()
来自动复用已经闲置或者失去连接的资源.
顾名思义, 这类协议的Header是固定的, 并且一般Header的长度是固定的, 但是也有例外情况, 此外, Header中也会包含数据体的长度等信息. 然后可以根据长度来截取一包数据.
2.1.2 固定头尾标识协议这类协议的数据包, 前几字节和后几字节是固定的, 这样就可以通过头尾的标识来截取一包数据.
通常, 这类协议的数据包, 为了避免数据内容和协议头、尾的标识冲突, 通常会设置转义, 即将数据包中出现头尾标识的地方, 转义为其它数据, 避免识别时出现错误.
2.1.3 固定包大小协议这类协议每一包数据的大小都是固定的, 所以可以直接根据长度进行读取.
2.1.4 命令行协议这类协议通常以\r\n
结尾, 采用字符串转为二进制流进行传输.
Package Type
即包类型, 这里描述的是数据包的结构, 例如SuperSocket中就提供了一些基础的包类型TextPackageInfo
等.
public class TextPackageInfo
{
public string Text{get; set;}
}
这里的TextPackageInfo
标识了这类类型的数据包中, 仅包含了一个字符串, 当然, 我们通常会有更复杂的网络数据包结构.例如, 我将在下列展示一个包含首尾标识的通信包, 它包含了首尾标识, 消息号, 终端Id, 以及消息体:
public class SamplePackage
{
public byte Begin{get; set;}
public MessageId MessageId{get; set;}
public string TerminalId{get; set;}
public SampleBody Body{get; set;}
public byte End{get; set;}
}
当然, 在SuperSocket中也提供了一些接口供我们实现一些类似格式的包, 不过个人不太喜欢这种方式, 官方文档也举了一些例子, 例如,有的包会有一个特殊的字段来代表此包内容的类型. 我们将此字段命名为 "Key". 此字段也告诉我们用何种逻辑处理此类型的包. 这是在网络应用程序中非常常见的一种设计. 例如,你的 Key 字段是整数类型,你的包类型需要实现接口IKeyedPackageInfo
:
public class MyPackage : IKeyedPackageInfo<int>
{
public int Key { get; set; }
public short Sequence { get; set; }
public string Body { get; set; }
}
3.2 PipelineFilter Type
这种类型在网络协议解析中作用重要. 它定义了如何将 IO 数据流解码成可以被应用程序理解的数据包. 换句话说, 就是把你的二进制流数据, 能够一包一包的识别出来, 同时可以解析成你构建的Package对象. 当然, 你也可以选择不构建, 然后将源数据直接返回.
这些是 PipelineFilter 的基本接口. 你的系统中至少需要一个实现这个接口的 PipelineFilter 类型.
public interface IPipelineFilter
{
void Reset();
object Context { get; set; }
}
public interface IPipelineFilter<TPackageInfo> : IPipelineFilter
where TPackageInfo : class
{
IPackageDecoder<TPackageInfo> Decoder { get; set; }
TPackageInfo Filter(ref SequenceReader<byte> reader);
IPipelineFilter<TPackageInfo> NextFilter { get; }
}
事实上,由于 SuperSocket 已经提供了一些内置的 PipelineFilter 模版,这些几乎可以覆盖 90% 的场景的模版极大的简化了你的开发工作. 所以你不需要完全从头开始实现 PipelineFilter. 即使这些内置的模版无法满足你的需求,完全自己实现PipelineFilter也不是难事.
3.3 使用PackageType和PipelineFilter Type创建SuperSocket你定义好 Package 类型和 PipelineFilter 类型之后,你就可以使用 SuperSocketHostBuilder 创建 SuperSocket 宿主了。
var host = SuperSocketHostBuilder.Create<StringPackageInfo, CommandLinePipelineFilter>();
在某些情况下,你可能需要实现接口 IPipelineFilterFactory 来完全控制 PipelineFilter 的创建。
public class MyFilterFactory : PipelineFilterFactoryBase<TextPackageInfo>
{
protected override IPipelineFilter<TPackageInfo> CreateCore(object client)
{
return new FixedSizePipelineFilter(10);
}
}
然后在 SuperSocket 宿主被创建出来之后启用这个 PipelineFilterFactory:
var host = SuperSocketHostBuilder.Create<StringPackageInfo>();
host.UsePipelineFilterFactory<MyFilterFactory>();
4 SuperSocket中的PipelineFilter, 实现自己的PipelineFilter
4.1 内置的PipelineFilter模板
SuperSocket中内置了一些PipelineFilter模板, 这些模板几乎可以覆盖到90%的应用场景, 极大简化了开发工作, 所以不需要完全从头开始实现PipelineFilter. 即使这些内置的模板无法满足你的需求, 完全自己实现PipelineFilter.
SuperSocket提供了这些PipelineFilter模板:
- TerminatorPipelineFilter (SuperSocket.ProtoBase.TerminatorPipelineFilter, SuperSocket.ProtoBase)
- TerminatorTextPipelineFilter (SuperSocket.ProtoBase.TerminatorTextPipelineFilter, SuperSocket.ProtoBase)
- LinePipelineFilter (SuperSocket.ProtoBase.LinePipelineFilter, SuperSocket.ProtoBase)
- CommandLinePipelineFilter (SuperSocket.ProtoBase.CommandLinePipelineFilter, SuperSocket.ProtoBase)
- BeginEndMarkPipelineFilter (SuperSocket.ProtoBase.BeginEndMarkPipelineFilter, SuperSocket.ProtoBase)
- FixedSizePipelineFilter (SuperSocket.ProtoBase.FixedSizePipelineFilter, SuperSocket.ProtoBase)
- FixedHeaderPipelineFilter (SuperSocket.ProtoBase.FixedHeaderPipelineFilter, SuperSocket.ProtoBase)
这种协议讲请求定义为两大部分, 第一部分定义了包含第二部分长度等等基础信息, 我们通常称第一部分为头部.
例如, 我们有一个这样的协议: 头部包含 3 个字节, 第 1 个字节用于存储请求的类型, 后两个字节用于代表请求体的长度:
/// +-------+---+-------------------------------+
/// |request| l | |
/// | type | e | request body |
/// | (1) | n | |
/// | |(2)| |
/// +-------+---+-------------------------------+
根据此协议的规范, 我们可以使用如下代码定义包的类型:
public class MyPackage
{
public byte Key { get; set; }
public string Body { get; set; }
}
下一个是设计PipelineFilter:
public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
public MyPipelineFilter()
: base(3) // 包头的大小是3字节,所以将3传如基类的构造方法中去
{
}
// 从数据包的头部返回包体的大小
protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
{
var reader = new SequenceReader<byte>(buffer);
reader.Advance(1); // skip the first byte
reader.TryReadBigEndian(out short len);
return len;
}
// 将数据包解析成 MyPackage 的实例
protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
{
var package = new MyPackage();
var reader = new SequenceReader<byte>(buffer);
reader.TryRead(out byte packageKey);
package.Key = packageKey;
reader.Advance(2); // skip the length
package.Body = reader.ReadString();
return package;
}
}
最后,你可通过数据包的类型和 PipelineFilter 的类型来创建宿主:
var host = SuperSocketHostBuilder.Create<MyPackage, MyPipelineFilter>()
.UsePackageHandler(async (s, p) =>
{
// handle your package over here
}).Build();
你也可以通过将解析包的代码从 PipelineFilter 移到 你的包解码器中来获得更大的灵活性:
public class MyPackageDecoder : IPackageDecoder<MyPackage>
{
public MyPackage Decode(ref ReadOnlySequence<byte> buffer, object context)
{
var package = new MyPackage();
var reader = new SequenceReader<byte>(buffer);
reader.TryRead(out byte packageKey);
package.Key = packageKey;
reader.Advance(2); // skip the length
package.Body = reader.ReadString();
return package;
}
}
通过 host builder 的 UsePackageDecoder 方法来在SuperSocket中启用它:
builder.UsePackageDecoder<MyPackageDecoder>();
4.2.3 另一种挂载解析器的方式
在Asp.Net Core Application我们可以new(), 直接注入或者采用工厂模式等方式, 向Host中注入协议解析器, 然后在过滤波器中进行使用.
public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
public readonly PacketConvert _packageConvert;
public MyPipelineFilter()
: base(3) // 包头的大小是3字节,所以将3传如基类的构造方法中去
{
_packageConvert = new PackageConvert();
}
// 从数据包的头部返回包体的大小
protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
{
var reader = new SequenceReader<byte>(buffer);
reader.Advance(1); // skip the first byte
reader.TryReadBigEndian(out short len);
return len;
}
// 将数据包解析成 MyPackage 的实例
protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
{
var package = _packageConvert.Deserialize<Package>(buffer);
return package;
}
}
PS: 过滤器中DecodePackage
返回的buffer可能不是完整的包, 例如固定头尾结构的包中, 返回的buffer可能是去掉头尾的格式
7 扩展AppSession和SuperSocketService 7.1 扩展AppSession例如固定头尾的包
0x7E 0x7E xxxxxxx 0x7E 0x7E
, 返回的buffer中头尾的0x7E 0x7E
会被去除, 只留下中间xxxxxxx
的部分,所以在实现解码器部分的时候需要注意.
在SuperSocket关于Socket的管理提供了SessionContainer
供大家获取程序中的Session实例, 只需在构建中调用.UseMiddleware<InProcSessionContainerMiddleware>()
和UseInProcSessionContainer()
即可通过AppSession.Server.SessionContainer()
获取.
但是为了方便管理, 个人角色还是实现一个另外的SessionManager
比较好, 这样可以更方便的集成到我们的Asp.Net Core Application中. 使用ConcurrentDictionary
原子字典来存储, 可以避免一些读写上的死锁问题.
public class SessionManager<TSession> where TSession : IAppSession
{
/// <summary>
/// 存储的Session
/// </summary>
public ConcurrentDictionary<string, TSession> Sessions { get; private set; } = new();
/// <summary>
/// Session的数量
/// </summary>
public int Count => Sessions.Count;
/// <summary>
/// </summary>
public SessionManager()
{
}
public ConcurrentDictionary<string, TSession> GetAllSessions()
{
return Sessions;
}
/// <summary>
/// 获取一个Session
/// </summary>
/// <param name="key"> </param>
/// <returns> </returns>
public virtual async Task<TSession> TryGet(string key)
{
return await Task.Run(() =>
{
Sessions.TryGetValue(key, out var session);
return session;
});
}
/// <summary>
/// 添加或者更新一个Session
/// </summary>
/// <param name="key"> </param>
/// <param name="session"> </param>
/// <returns> </returns>
public virtual async Task TryAddOrUpdate(string key, TSession session)
{
await Task.Run(() =>
{
if (Sessions.TryGetValue(key, out var oldSession))
{
Sessions.TryUpdate(key, session, oldSession);
}
else
{
Sessions.TryAdd(key, session);
}
});
}
/// <summary>
/// 移除一个Session
/// </summary>
/// <param name="key"> </param>
/// <returns> </returns>
public virtual async Task TryRemove(string key)
{
await Task.Run(() =>
{
if (Sessions.TryRemove(key, out var session))
{
}
else
{
}
});
}
/// <summary>
/// 通过Session移除Session
/// </summary>
/// <param name="sessionId"> </param>
/// <returns> </returns>
public virtual async Task TryRemoveBySessionId(string sessionId)
{
await Task.Run(() =>
{
foreach (var session in Sessions)
{
if (session.Value.SessionID == sessionId)
{
Sessions.TryRemove(session);
return;
}
}
});
}
/// <summary>
/// 删除僵尸链接
/// </summary>
/// <returns> </returns>
[Obsolete("该方法丢弃", true)]
public virtual async Task TryRemoveZombieSessions()
{
await Task.Run(() =>
{
});
}
/// <summary>
/// 移除所有Session
/// </summary>
/// <returns> </returns>
public virtual async Task TryRemoveAll()
{
await Task.Run(() =>
{
Sessions.Clear();
});
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <param name="buffer"> </param>
/// <returns> </returns>
public virtual async Task SendAsync(TSession session, ReadOnlyMemory<byte> buffer)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
await session.SendAsync(buffer);
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <param name="message"> </param>
/// <returns> </returns>
public virtual async Task SendAsync(ClientSession session, string message)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
// ReSharper disable once PossibleNullReferenceException
await session.SendAsync(message);
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <returns> </returns>
public virtual async Task<Guid> FindIdBySession(TSession session)
{
return await Task.Run(() =>
{
return Guid.Parse(Sessions.First(x => x.Value.SessionID.Equals(session.SessionID)).Key);
});
}
}
7.2 如何自己实现SuperSocketService?
我们在使用SuperSocket时需要在Program.cs
中来构建, 这样会导致一个问题, 这样我们的SuperSocket服务就会变得难以控制, 那么有没有一种写法来将这部分代码抽离出来呢?
答案是有的, 我们可以采用.Net Core中的BackgroundService
或者IHostedService
来实现后台服务, 甚至将这些服务管理起来, 根据需要随时创建, 随时启动, 随时停止. 这样做的好处还有, 我们可以随时获取依赖注入的服务来做一些更多的操作, 例如读取配置, 管理Session, 配置编解码器, 日志, 应答器, MQ等等.
public class TcpSocketServerHostedService : IHostedService
{
private readonly IOptions<ServerOption> _serverOptions;
private readonly IOptions<KafkaOption> _kafkaOptions;
private readonly ClientSessionManagers _clientSessionManager;
private readonly TerminalSessionManager _gpsTrackerSessionManager;
private readonly ILogger<TcpSocketServerHostedService> _logger;
private readonly IGeneralRepository _generalRepository;
private readonly NbazhGpsSerializer _nbazhGpsSerializer = new NbazhGpsSerializer();
private static EV26MsgIdProducer _provider = null;
/// <summary>
/// Tcp Server服务
/// </summary>
/// <param name="serverOptions"> </param>
/// <param name="kafkaOptions"> </param>
/// <param name="clientSessionManager"> </param>
/// <param name="gpsTrackerSessionManager"> </param>
/// <param name="logger"> </param>
/// <param name="factory"> </param>
public TcpSocketServerHostedService(
IOptions<ServerOption> serverOptions,
IOptions<KafkaOption> kafkaOptions,
ClientSessionManagers clientSessionManager,
TerminalSessionManager gpsTrackerSessionManager,
ILogger<TcpSocketServerHostedService> logger,
IServiceScopeFactory factory)
{
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_kafkaOptions = kafkaOptions;
_clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
_gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
var host = SuperSocketHostBuilder.Create<NbazhGpsPackage, ProtocolPipelineSwitcher>()
.ConfigureSuperSocket(opts =>
{
foreach (var listener in _serverOptions.Value.TcpListeners)
{
opts.AddListener(new ListenOptions()
{
Ip = listener.Ip,
Port = listener.Port
});
}
})
.UseSession<GpsTrackerSession>()
.UseClearIdleSession()
.UseSessionHandler(onClosed: async (s, v) =>
{
try
{
// Session管理
await _gpsTrackerSessionManager.TryRemoveBySessionId(s.SessionID);
}
catch
{
// ignored
}
})
.UsePackageHandler(async (s, packet) =>
{
// 处理包
})
.UseInProcSessionContainer()
.BuildAsServer();
await host.StartAsync();
await Task.CompletedTask;
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await _gpsTrackerSessionManager.TryRemoveAll();
}
catch
{
// ignored
}
await Task.CompletedTask;
}
}
实现服务后, 我们可以写扩展方法将服务注入.
/// <summary>
/// 服务器扩展
/// </summary>
public static class ServerBuilderExtensions
{
/// <summary>
/// 添加Tcp服务器
/// </summary>
/// <param name="services"> </param>
/// <returns> </returns>
public static IServiceCollection AddTcpServer(
this IServiceCollection services)
{
services.AddSingleton<TerminalSessionManager>();
services.AddHostedService<TcpSocketServerHostedService>();
return services;
}
/// <summary>
/// 添加Ws服务器
/// </summary>
/// <param name="services"> </param>
/// <returns> </returns>
public static IServiceCollection AddWsServer(
this IServiceCollection services)
{
services.AddSingleton<ClientSessionManagers>();
services.AddHostedService<WebSocketServerHostedService>();
return services;
}
}
8 扩展SuperSocket的功能
8.1 多协议切换
我们有时候会面对一种需求, 就是同一个接口, 需要接收不同的终端的协议包, 这样我们通常会根据协议的不同点来区分协议. PS: 协议最好是同类型协议, 并且有明显不同的特征!
具体的实现方式就是实现一个特殊的PipelineFilter
, 在下列代码中, 我们将读取该包数据的第一个字节来分辨该协议为0x78 0x78
类型开头的协议还是0x79 0x79
开头的协议, 然后将标记移回改包开头, 然后将这一包数据交给对应的过滤器来进行解析:
// NbazhGpsPackage: 包编解码器
public class ProtocolPipelineSwitcher : PipelineFilterBase<NbazhGpsPackage>
{
private IPipelineFilter<NbazhGpsPackage> _filter7878;
private byte _beginMarkA = 0x78;
private IPipelineFilter<NbazhGpsPackage> _filter7979;
private byte _beginMarkB = 0x79;
public ProtocolPipelineSwitcher()
{
_filter7878 = new EV26PipelineFilter7878(this);
_filter7979 = new EV26PipelineFilter7979(this);
}
public override NbazhGpsPackage Filter(ref SequenceReader<byte> reader)
{
if (!reader.TryRead(out byte flag))
{
throw new ProtocolException(@"flag byte cannot be read");
}
if (flag == _beginMarkA)
{
NextFilter = _filter7878;
}
else if (flag == _beginMarkB)
{
NextFilter = _filter7979;
}
else
{
return null;
//throw new ProtocolException($"首字节未知 {flag}");
}
// 将标记移回开头
reader.Rewind(1);
return null;
}
}
9 搭建WebSocket服务器
WebSocket Server的实现方式与之前的Socket Server实现方式大致相同, 其中不同的地方主要为: WebSocket Server不需要配置编解码器, 采用String作为消息格式等.
/// <summary>
/// </summary>
public class WebSocketServerHostedService : IHostedService
{
private readonly IOptions<ServerOption> _serverOptions;
private readonly ClientSessionManagers _clientSessionManager;
private readonly TerminalSessionManager _gpsTrackerSessionManager;
private readonly IGeneralRepository _generalRepository;
/// <summary>
/// </summary>
/// <param name="serverOptions"> </param>
/// <param name="clientSessionManager"> </param>
/// <param name="gpsTrackerSessionManager"> </param>
/// <param name="factory"> </param>
public WebSocketServerHostedService(
IOptions<ServerOption> serverOptions,
ClientSessionManagers clientSessionManager,
TerminalSessionManager gpsTrackerSessionManager,
IServiceScopeFactory factory)
{
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(_serverOptions));
_clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
_gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
_generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
}
/// <summary>
/// WebSocketServer
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
var host = WebSocketHostBuilder.Create()
.ConfigureSuperSocket(opts =>
{
foreach (var listener in _serverOptions.Value.WsListeners)
{
opts.AddListener(new ListenOptions()
{
Ip = listener.Ip,
Port = listener.Port
});
}
})
.UseSession<ClientSession>()
.UseClearIdleSession()
.UseSessionHandler(onClosed: async (s, v) =>
{
await _clientSessionManager.TryRemoveBySessionId(s.SessionID);
})
.UseWebSocketMessageHandler(async (s, p) =>
{
var package = p.Message.ToObject<ClientPackage>();
if (package.PackageType == PackageType.Heart)
{
return;
}
if (package.PackageType == PackageType.Login)
{
var client = _generalRepository.FindAsync<User>(x => x.Id.Equals(Guid.Parse(package.ClientId)));
if (client is null)
{
await s.CloseAsync(CloseReason.ProtocolError, "ClientId不存在");
}
var verifyCode = Guid.NewGuid().ToString();
var loginPacket = new ClientPackage()
{
PackageType = PackageType.Login,
ClientId = package.ClientId,
VerifyCode = verifyCode,
};
s["VerifyCode"] = verifyCode;
var msg = loginPacket.ToJson();
await s.SendAsync(msg);
}
// 追踪
if (package.PackageType == PackageType.Trace)
{
return;
}
})
.UseInProcSessionContainer()
.BuildAsServer();
await host.StartAsync();
await Task.CompletedTask;
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
}
}
9.1 番外: WebSocket传参
WebSocket的第一次请求是基于Http进行建立链接的, 所以WebSocket是可以在url中或者请求体中携带token等参数的. 当然后端并不像写Api时那么简单的就可以获取, 需要截取当前请求的Url或者携带的信息, 然后进行读取, 进而进行验证等操作. 这部分的代码之后再补充...
//.Net Core从Url中读取参数
10 多服务器以及不同服务间的协同
得益于我们实现的SessionManager
, 我们将不用Server
的SessionManger
注入DI后, 我们可以在任意Server
的Service
中做到跨Service
进行消息传递, 验证等等操作.
协议编解码器样例:
EV26 Gps通信协议(使用方法在xUnit测试中):
- Github
- Gitee
简单样例:
public class Nbazh0X01Test
{
private readonly ITestOutputHelper _testOutputHelper;
private NbazhGpsSerializer NbazhGpsSerializer = new NbazhGpsSerializer();
public Nbazh0X01Test(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public void Test1()
{
//78 78 11 01 07 52 53 36 78 90 02 42 70 00 32 01 00 05 12 79 0D 0A
var hex = "7878 11 01 07 52 53 36 78 90 02 42 7000 3201 0005 1279 0D0A".ToHexBytes();
// ----协议解析部分----//
var packet = NbazhGpsSerializer.Deserialize(hex);
Nbazh0X01 body = (Nbazh0X01)packet.Bodies;
// ----协议解析部分----//
Assert.Equal(0x11, packet.Header.Length);
Assert.Equal(0x01, packet.Header.MsgId);
Assert.Equal("7 52 53 36 78 90 02 42".Replace(" ", ""), body.TerminalId);
Assert.Equal(0x7000, body.TerminalType);
//Assert.Equal(0x3201, body.TimeZoneLanguage.Serialize());
Assert.Equal(0x0005, packet.Header.MsgNum);
Assert.Equal(0x1279, packet.Header.Crc);
// 时区 0011 001000000001
}
}
More 2 DotNetty
以后我们会探究DotNetty与SuperSocket的异同.