微軟官方部落格在去年發佈了一篇 System.IO.Pipelines: High performance IO in .NET,這個東西高不高效我是比較不出來,但是它著實解決了以往在接收 NetworkStream 時算位置的困擾,用它寫出來的程式碼簡潔清晰許多。
以往我們接收 NetworkStream 時,由於 Buffer 的大小通常是固定的,而串流資料的長度是不固定的,所以我們在解析的時候,每一種協定就有一套計算讀取位置的邏輯,這種邏輯通常腦袋是記不住的,當協定久久異動一次的時候,我們就要開始擔心原本的解析邏輯是不是又要改了,相當痛苦。
下面是一個簡單的範例,假定 Server 與 Client 之間約定了一組協定,資料的處理邏輯以「包」為基本的處理單位,Server 會將資料一包一包地傳給 Client,以換行符號 \n
作為分隔符號,遇到換行符號時就代表一包資料結束,一包資料的長度則不固定,過去我們處理這類需求的程式碼大概就像下面這樣,需要去考慮到一個 Buffer 會有 0 ~ n 個換行符號。
// 取得 NetworkStream
var stream = client.GetStream();
// 宣告資料緩衝區
var line = new List<byte[]>();
while (true)
{
// 設定 Buffer 長度
var buffer = new byte[10];
// 讀取一個 Buffer 長度的資料
var numBytesRead = stream.Read(buffer, 0, buffer.Length);
int newlinePosition;
int numBytesConsumed = 0;
do
{
// 搜尋換行符號的位置
newlinePosition = Array.IndexOf(buffer, (byte)'\n', numBytesConsumed);
if (newlinePosition >= 0)
{
// 將換行符號之間的資料放進緩衝區
line.Add(buffer.Skip(numBytesConsumed).Take(newlinePosition - numBytesConsumed).ToArray());
// 標記已經處理的資料長度
numBytesConsumed = newlinePosition + 1;
// 緩衝區內的資料已成一包,送給邏輯程序處理。
ProcessData(line.SelectMany(x => x).ToArray());
// 清空緩衝區
line.Clear();
}
else
{
// 將剩餘的資料放進緩衝區
line.Add(buffer.Skip(numBytesConsumed).Take(numBytesRead - numBytesConsumed).ToArray());
}
}
while (newlinePosition >= 0);
}
如果沒有註解,過一陣子之後,這段程式碼就變天書了,需要通靈才能喚回過去記憶,寫類似這種按照協定讀取串流資料的程式是挺痛苦的,但是 System.IO.Pipelines 把這件事變簡單了,我們來看一下要怎麼使用它?
顧名思義,Pipeline 就是管道,我們可以這樣想像,有一根管子,管子的一頭不停地往裡面塞東西,管子的另外一頭,可以依照自己想要的長度,一次地把東西從管子裡面拿出來,因此我們的程式碼會分成兩段,一段是不停地往管子裡面寫資料,一段是依照我們想要的邏輯從管子中讀資料。
使用 System.IO.Pipelines 我們從 new 一個 Pipe
開始,Pipe 裡面會有一個 Writer
屬性及 Reader
屬性,Writer 就是往管子寫資料,Reader 就是從管子讀資料。
var pipe = new Pipe();
var writer = pipe.Writer;
var reader = pipe.Reader;
往管子寫資料
// 取得 NetworkStream
var stream = client.GetStream();
while (true)
{
// 設定 Buffer 長度
var buffer = new byte[10];
// 讀取一個 Buffer 長度的資料
var numBytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (numBytesRead == 0) continue;
// 將資料寫進 Pipe
var flushResult = await writer.WriteAsync(new ReadOnlyMemory<byte>(buffer.Take(numBytesRead).ToArray()));
if (flushResult.IsCompleted) break;
}
從 NetworkStream 讀取資料的部分保持不變,然後呼叫 PipeWriter.WriteAsync()
方法將資料寫入管子內。
從管子讀資料
while (true)
{
// 讀取管子內目前的資料狀況
var result = await reader.ReadAsync();
var buffer = result.Buffer;
do
{
// 搜尋換行符號的位置
var position = buffer.PositionOf((byte)'\n');
if (position == null) break;
// 將管子內的換行符號前的資料,送給邏輯程序處理。
ProcessData(buffer.Slice(0, position.Value));
// 從目前搜尋到的換行符號的下一個位置,再搜尋換行符號。
var next = buffer.GetPosition(1, position.Value);
buffer = buffer.Slice(next);
}
while (true);
// 標記管子內有多少資料已經被讀取並處理,主要是釋放管子的空間,讓 Writer 可以重複利用。
// 有呼叫 ReadAsync() 就一定要呼叫 AdvanceTo(),即使沒有處理到任何資料也是一樣。
reader.AdvanceTo(buffer.Start, buffer.End);
}
這邊就可以撰寫我們解析資料的邏輯,可以看到我們不用再去算位置了,當符合規則時,直接把整段資料取出來使用,大大降低了程式碼的複雜性,而且 Pipe 是 Thread-Safe 的,Writer 跟 Reader 可以分開兩個執行緒執行,不用擔心同步的問題。
眉角:ReadResult.Buffer 的解讀方式
如果我們的解析邏輯不是那麼的單純,難免需要對 ReadResult.Buffer
進行裁切,它的型別是 ReadOnlySequence<byte>
,裡面有一個 Slice()
方法可以讓我們來對 Buffer 做裁切,而且 Slice() 方法提供 9 個多載,已經足夠我們使用了,而我常用到的是這兩個方法:
public ReadOnlySequence<T> Slice(SequencePosition start, SequencePosition end);
public ReadOnlySequence<T> Slice(SequencePosition start, int length);
Buffer 有兩個屬性 Start
跟 End
代表 Buffer 的起迄位置,Start 跟 End 的型別是 SequencePosition
,這個東西跟以往我們熟悉的陣列索引不一樣,一開始沒有弄清楚,結果裁切出來的資料不是我要的。
我們熟悉的陣列及其索引是長這樣的:
而 SequencePosition 這個東西要這樣看:
如果上圖是一個 ReadResult.Buffer,那麼它的 Start 就是 0,End 就是 8,也因為這樣的特性,所以有一些方法的參數或回傳值是 SequencePosition 型別的,就要特別注意一下,像是 PositionOf()
擴充方法,以上圖為例,如果我呼叫 PositionOf((byte)'d') 得到的 SequencePosition 值會是 3。
還有像是 PipeReader.AdvanceTo()
方法,它其實是幫我們在 Buffer 中,將標記已處理資料的位置移到參數所指定的位置,以上圖為例,假設 SequencePosition 的值是 3,丟進 AdvanceTo() 方法之後,那麼下一次 PipeReader 讀取的結果 d
就會是第一個元素。
參考資料
< Source Code >