当前位置 : 主页 > 网络安全 > 测试自动化 >

[IO]System.IO.Pipelines

来源:互联网 收集:自由互联 发布时间:2021-06-22
System.IO.Pipelines是一个新库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。 System.IO.Pipelines 解决什么问题 System.IO.Pipelines 已构建为: 具有高

  System.IO.Pipelines 是一个新库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。

System.IO.Pipelines 解决什么问题

  System.IO.Pipelines 已构建为:

  • 具有高性能的流数据分析功能。
  • 减少代码复杂性。

下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 ‘\n‘ 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);
    
    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

  • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
  • 忽略了 stream.ReadAsync 的结果。 stream.ReadAsync 返回读取的数据量。
  • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
  • 它为每次读取分配一个 byte 数组。

要解决上述问题,需要进行以下更改:

  • 缓冲传入的数据,直到找到新行。

  • 分析缓冲区中返回的所有行。

  • 该行可能大于 1KB(1024 字节)。 找到需要调整输入缓冲区大小的代码(一行完整的代码)。

    • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
    • 压缩用于读取行的缓冲区,以减少空余。
  • 请考虑使用缓冲池来避免重复分配内存。

 

下面的代码解决了其中一些问题:

 

 1 async Task ProcessLinesAsync(NetworkStream stream)
 2 {
 3     byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
 4     var bytesBuffered = 0;
 5     var bytesConsumed = 0;
 6 
 7     while (true)
 8     {
 9         // Calculate the amount of bytes remaining in the buffer.
10         var bytesRemaining = buffer.Length - bytesBuffered;
11 
12         if (bytesRemaining == 0)
13         {
14             // Double the buffer size and copy the previously buffered data into the new buffer.
15             var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16             Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17             // Return the old buffer to the pool.
18             ArrayPool<byte>.Shared.Return(buffer);
19             buffer = newBuffer;
20             bytesRemaining = buffer.Length - bytesBuffered;
21         }
22 
23         var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24         if (bytesRead == 0)
25         {
26             // EOF
27             break;
28         }
29 
30         // Keep track of the amount of buffered bytes.
31         bytesBuffered += bytesRead;
32         var linePosition = -1;
33 
34         do
35         {
36             // Look for a EOL in the buffered data.
37             linePosition = Array.IndexOf(buffer, (byte)\n, bytesConsumed,
38                                          bytesBuffered - bytesConsumed);
39 
40             if (linePosition >= 0)
41             {
42                 // Calculate the length of the line based on the offset.
43                 var lineLength = linePosition - bytesConsumed;
44 
45                 // Process the line.
46                 ProcessLine(buffer, bytesConsumed, lineLength);
47 
48                 // Move the bytesConsumed to skip past the line consumed (including \n).
49                 bytesConsumed += lineLength + 1;
50             }
51         }
52         while (linePosition >= 0);
53     }
54 }
View Code

Pipe

 Pipe 类可用于创建 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都可用于 PipeReader

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Pipe基本用法

 1 async Task ProcessLinesAsync(Socket socket)
 2 {
 3     var pipe = new Pipe();
 4     Task writing = FillPipeAsync(socket, pipe.Writer);
 5     Task reading = ReadPipeAsync(pipe.Reader);
 6 
 7     await Task.WhenAll(reading, writing);
 8 }
 9 
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12     const int minimumBufferSize = 512;
13 
14     while (true)
15     {
16         // Allocate at least 512 bytes from the PipeWriter.
17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18         try
19         {
20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21             if (bytesRead == 0)
22             {
23                 break;
24             }
25             // Tell the PipeWriter how much was read from the Socket.
26             writer.Advance(bytesRead);
27         }
28         catch (Exception ex)
29         {
30             LogError(ex);
31             break;
32         }
33 
34         // Make the data available to the PipeReader.
35         FlushResult result = await writer.FlushAsync();
36 
37         if (result.IsCompleted)
38         {
39             break;
40         }
41     }
42 
43      // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44     await writer.CompleteAsync();
45 }
46 
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49     while (true)
50     {
51         ReadResult result = await reader.ReadAsync();
52         ReadOnlySequence<byte> buffer = result.Buffer;
53 
54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55         {
56             // Process the line.
57             ProcessLine(line);
58         }
59 
60         // Tell the PipeReader how much of the buffer has been consumed.
61         reader.AdvanceTo(buffer.Start, buffer.End);
62 
63         // Stop reading if there‘s no more data coming.
64         if (result.IsCompleted)
65         {
66             break;
67         }
68     }
69 
70     // Mark the PipeReader as complete.
71     await reader.CompleteAsync();
72 }
73 
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76     // Look for a EOL in the buffer.
77     SequencePosition? position = buffer.PositionOf((byte)\n);
78 
79     if (position == null)
80     {
81         line = default;
82         return false;
83     }
84 
85     // Skip the line + the \n.
86     line = buffer.Slice(0, position.Value);
87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88     return true;
89 }
View Code

有两个循环:

  • FillPipeAsync 从 Socket 读取并写入 PipeWriter
  • ReadPipeAsync 从 PipeReader 读取并分析传入的行。

没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader 和 PipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。

在第一个循环中:

  • 调用 PipeWriter.GetMemory(Int32) 从基础编写器获取内存。
  • 调用 PipeWriter.Advance(Int32) 以告知 PipeWriter 有多少数据已写入缓冲区。
  • 调用 PipeWriter.FlushAsync 以使数据可用于 PipeReader

在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:

  • 返回包含两条重要信息的 ReadResult:

    • 以 ReadOnlySequence<byte> 形式读取的数据。
    • 布尔值 IsCompleted,指示是否已到达数据结尾 (EOF)。

找到行尾 (EOL) 分隔符并分析该行后:

  • 该逻辑处理缓冲区以跳过已处理的内容。
  • 调用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。

读取器和编写器循环通过调用 Complete 结束。 Complete 使基础管道释放其分配的内存。

反压和流量控制

理想情况下,读取和分析可协同工作:

  • 写入线程使用来自网络的数据并将其放入缓冲区。
  • 分析线程负责构造适当的数据结构。

通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

  • 读取线程领先于分析线程。
  • 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。

为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

为解决上述问题,Pipe 提供了两个设置来控制数据流:

  • PauseWriterThreshold:确定在调用 FlushAsync 暂停之前应缓冲多少数据。
  • ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。

PipeWriter.FlushAsync:

  • 当 Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>
  • 低于 ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

网友评论