[料理佳餚] System.IO.Pipelines 解決了以往接收 NetworkStream 算位置的困擾

微軟官方部落格在去年發佈了一篇 System.IO.Pipelines: High performance IO in .NET,這個東西高不高效我是比較不出來,但是它著實解決了以往在接收 NetworkStream 時算位置的困擾,用它寫出來的程式碼簡潔清晰許多。

以往我們接收 NetworkStream 時,由於 Buffer 的大小通常是固定的,而串流資料的長度是不固定的,所以我們在解析的時候,每一種協定就有一套計算讀取位置的邏輯,這種邏輯通常腦袋是記不住的,當協定久久異動一次的時候,我們就要開始擔心原本的解析邏輯是不是又要改了,相當痛苦。

下面是一個簡單的範例,假定 Server 與 Client 之間約定了一組協定,資料的處理邏輯以「包」為基本的處理單位,Server 會將資料一包一包地傳給 Client,以換行符號 \n 作為分隔符號,遇到換行符號時就代表一包資料結束,一包資料的長度則不固定,過去我們處理這類需求的程式碼大概就像下面這樣,需要去考慮到一個 Buffer 會有 0 ~ n 個換行符號。

// 取得 NetworkStream
var stream = client.GetStream();

// 宣告資料緩衝區
var line = new List<byte[]>();

while (true)
{
    // 設定 Buffer 長度
    var buffer = new byte[10];

    // 讀取一個 Buffer 長度的資料
    var numBytesRead = stream.Read(buffer, 0, buffer.Length);

    int newlinePosition;
    int numBytesConsumed = 0;

    do
    {
        // 搜尋換行符號的位置
        newlinePosition = Array.IndexOf(buffer, (byte)'\n', numBytesConsumed);

        if (newlinePosition >= 0)
        {
            // 將換行符號之間的資料放進緩衝區
            line.Add(buffer.Skip(numBytesConsumed).Take(newlinePosition - numBytesConsumed).ToArray());

            // 標記已經處理的資料長度
            numBytesConsumed = newlinePosition + 1;

            // 緩衝區內的資料已成一包,送給邏輯程序處理。
            ProcessData(line.SelectMany(x => x).ToArray());

            // 清空緩衝區
            line.Clear();
        }
        else
        {
            // 將剩餘的資料放進緩衝區
            line.Add(buffer.Skip(numBytesConsumed).Take(numBytesRead - numBytesConsumed).ToArray());
        }
    }
    while (newlinePosition >= 0);
}

如果沒有註解,過一陣子之後,這段程式碼就變天書了,需要通靈才能喚回過去記憶,寫類似這種按照協定讀取串流資料的程式是挺痛苦的,但是 System.IO.Pipelines 把這件事變簡單了,我們來看一下要怎麼使用它?

顧名思義,Pipeline 就是管道,我們可以這樣想像,有一根管子,管子的一頭不停地往裡面塞東西,管子的另外一頭,可以依照自己想要的長度,一次地把東西從管子裡面拿出來,因此我們的程式碼會分成兩段,一段是不停地往管子裡面寫資料,一段是依照我們想要的邏輯從管子中讀資料。

使用 System.IO.Pipelines 我們從 new 一個 Pipe 開始,Pipe 裡面會有一個 Writer 屬性及 Reader 屬性,Writer 就是往管子寫資料,Reader 就是從管子讀資料。

var pipe = new Pipe();
var writer = pipe.Writer;
var reader = pipe.Reader;

往管子寫資料

// 取得 NetworkStream
var stream = client.GetStream();

while (true)
{
    // 設定 Buffer 長度
    var buffer = new byte[10];

    // 讀取一個 Buffer 長度的資料
    var numBytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);

    if (numBytesRead == 0) continue;

    // 將資料寫進 Pipe
    var flushResult = await writer.WriteAsync(new ReadOnlyMemory<byte>(buffer.Take(numBytesRead).ToArray()));

    if (flushResult.IsCompleted) break;
}

從 NetworkStream 讀取資料的部分保持不變,然後呼叫 PipeWriter.WriteAsync() 方法將資料寫入管子內。

從管子讀資料

while (true)
{
    // 讀取管子內目前的資料狀況
    var result = await reader.ReadAsync();

    var buffer = result.Buffer;

    do
    {
        // 搜尋換行符號的位置
        var position = buffer.PositionOf((byte)'\n');

        if (position == null) break;

        // 將管子內的換行符號前的資料,送給邏輯程序處理。
        ProcessData(buffer.Slice(0, position.Value));
        
        // 從目前搜尋到的換行符號的下一個位置,再搜尋換行符號。
        var next = buffer.GetPosition(1, position.Value);

        buffer = buffer.Slice(next);
    }
    while (true);
    
    // 標記管子內有多少資料已經被讀取並處理,主要是釋放管子的空間,讓 Writer 可以重複利用。
    // 有呼叫 ReadAsync() 就一定要呼叫 AdvanceTo(),即使沒有處理到任何資料也是一樣。
    reader.AdvanceTo(buffer.Start, buffer.End);
}

這邊就可以撰寫我們解析資料的邏輯,可以看到我們不用再去算位置了,當符合規則時,直接把整段資料取出來使用,大大降低了程式碼的複雜性,而且 Pipe 是 Thread-Safe 的,Writer 跟 Reader 可以分開兩個執行緒執行,不用擔心同步的問題。

眉角:ReadResult.Buffer 的解讀方式

如果我們的解析邏輯不是那麼的單純,難免需要對 ReadResult.Buffer 進行裁切,它的型別是 ReadOnlySequence<byte>,裡面有一個 Slice() 方法可以讓我們來對 Buffer 做裁切,而且 Slice() 方法提供 9 個多載,已經足夠我們使用了,而我常用到的是這兩個方法:

public ReadOnlySequence<T> Slice(SequencePosition start, SequencePosition end);
public ReadOnlySequence<T> Slice(SequencePosition start, int length);

Buffer 有兩個屬性 StartEnd 代表 Buffer 的起迄位置,Start 跟 End 的型別是 SequencePosition,這個東西跟以往我們熟悉的陣列索引不一樣,一開始沒有弄清楚,結果裁切出來的資料不是我要的。

我們熟悉的陣列及其索引是長這樣的:

而 SequencePosition 這個東西要這樣看:

如果上圖是一個 ReadResult.Buffer,那麼它的 Start 就是 0,End 就是 8,也因為這樣的特性,所以有一些方法的參數或回傳值是 SequencePosition 型別的,就要特別注意一下,像是 PositionOf() 擴充方法,以上圖為例,如果我呼叫 PositionOf((byte)'d') 得到的 SequencePosition 值會是 3。

還有像是 PipeReader.AdvanceTo() 方法,它其實是幫我們在 Buffer 中,將標記已處理資料的位置移到參數所指定的位置,以上圖為例,假設 SequencePosition 的值是 3,丟進 AdvanceTo() 方法之後,那麼下一次 PipeReader 讀取的結果 d 就會是第一個元素。

PositionOf() 擴充方法有用到 C# 7.2 才新增的語言特性,所以要調整專案的建置語言版本至少到 C# 7.2。

參考資料

 < Source Code >