我有一台服务器通过TCP LAN与50个或更多设备通信.每个套接字读取消息循环都有一个Task.Run. 我将每个消息覆盖缓冲到一个阻塞队列中,其中每个阻塞队列都有一个使用BlockingCollection.Take(
我将每个消息覆盖缓冲到一个阻塞队列中,其中每个阻塞队列都有一个使用BlockingCollection.Take()的Task.Run.
所以像(半伪代码):
套接字阅读任务
Task.Run(() => { while (notCancelled) { element = ReadXml(); switch (element) { case messageheader: MessageBlockingQueue.Add(deserialze<messageType>()); ... } } });
消息缓冲区任务
Task.Run(() => { while (notCancelled) { Process(MessageQueue.Take()); } });
这样就可以在自己的缓冲区上完成50个读取任务和50个任务.
我这样做是为了避免阻塞读取循环并允许程序更公平地分配处理时间,或者我相信.
这是一种处理它的低效方法吗?什么是更好的方式?
您可能对“渠道”工作感兴趣,特别是: System.Threading.Channels.这样做的目的是提供异步生产者/消费者队列,涵盖单个和多个生产者和消费者场景,上限等.通过使用异步API,你不是在等待一些事情要做很多线程.您的读取循环将变为:
while (notCancelled) { var next = await queue.Reader.ReadAsync(optionalCancellationToken); Process(next); }
和制片人:
switch (element) { case messageheader: queue.Writer.TryWrite(deserialze<messageType>()); ... }
所以:微小的变化
或者 – 或者组合 – 您可以查看诸如“管道”之类的内容(https://www.nuget.org/packages/System.IO.Pipelines/) – 因为您正在处理TCP数据,这将是一个理想的选择,并且我已经看过自定义Web套接字服务器这里是Stack Overflow(处理大量连接).由于API始终是异步的,因此它可以很好地平衡工作 – 并且管道API在设计时考虑了典型的TCP场景,例如在检测帧边界时部分消耗传入的数据流.我已经写了很多这个用法,代码示例主要是here.请注意,“管道”不包括直接TCP层,但“ruby”服务器包括一个,或第三方库https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/(披露:我写的).