.Net BlockingCollection VS Channel 消費者與生產者模式 效能

在工廠中會有產線的流程,而當中就會有生者者,與消費者的角色問題
然而產線可以有一條多條,一個生產者多個生產者,一個消費者多個消費者,而本次關心的是,這個產線的效能

選手一 BlockingCollection

  1. .NET Framework 4.0 開始引入的物件
  2. 在MSDN上的說明為: 提供安全執行緒集合適用的封鎖和界限容量

選手二Channel

  1. .Net Core 2.1 開始引入的物件
  2. 在MSDN上的說明為: 提供一組同步資料結構,用來在生產者和消費者之間以非同步方式傳遞資料

實驗步驟

設計一個生產者,一個消費者,觀察兩者之間的產生效能速度

程式碼部分為

var channel = Channel.CreateUnbounded<string>();

var producer1 = new Producer(channel.Writer, 1, 0);
var consumer1 = new Consumer(channel.Reader, 1, 0);

Task consumerTask1 = consumer1.ConsumeData(); 
Task producerTask1 = producer1.BeginProducing(); 

await producerTask1.ContinueWith(_ => channel.Writer.Complete());

await consumerTask1;

var blockQueue = new BlockingCollection<string>();

var producer1 = new Producer2(blockQueue, 1, 0);
var consumer1 = new Consumer2(blockQueue, 1, 0);

Task.Run(consumer1.ConsumeData);

Task.Run(producer1.BeginProducing);

編寫的物件

public class Producer
{
    private readonly ChannelWriter<string> _writer;
    private readonly int _identifier;
    private readonly int _delay;

    public Producer(ChannelWriter<string> writer, int identifier, int delay)
    {
        _writer = writer;
        _identifier = identifier;
        _delay = delay;
    }

    public async Task BeginProducing()
    {
        Console.WriteLine($"PRODUCER ({_identifier}): Starting");

        for (var i = 0; i < 10; i++)
        {
            await Task.Delay(_delay); 

            var msg = $"P{_identifier} - {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")}";

            Console.WriteLine($"PRODUCER ({_identifier}): Creating {msg}");

            await _writer.WriteAsync(msg);
        }

        Console.WriteLine($"PRODUCER ({_identifier}): Completed");
    }
}

public class Consumer
{
    private readonly ChannelReader<string> _reader;
    private readonly int _identifier;
    private readonly int _delay;

    public Consumer(ChannelReader<string> reader, int identifier, int delay)
    {
        _reader = reader;
        _identifier = identifier;
        _delay = delay;
    }

    public async Task ConsumeData()
    {
        Console.WriteLine($"CONSUMER ({_identifier}): Starting");

        while (await _reader.WaitToReadAsync())
        {
            if (_reader.TryRead(out var timeString))
            {
                await Task.Delay(_delay); 

                Console.WriteLine($"CONSUMER ({_identifier}): Consuming {timeString}- {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")}");
            }
        }

        Console.WriteLine($"CONSUMER ({_identifier}): Completed");
    }
}

public class Producer2
{
    private readonly BlockingCollection<string> _writer;
    private readonly int _identifier;
    private readonly int _delay;

    public Producer2(BlockingCollection<string> writer, int identifier, int delay)
    {
        _writer = writer;
        _identifier = identifier;
        _delay = delay;
    }

    public void BeginProducing()
    {
        Console.WriteLine($"PRODUCER ({_identifier}): Starting");

        for (var i = 0; i < 10; i++)
        {
            Thread.Sleep(_delay); 

            var msg = $"P{_identifier} - {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")}";

            Console.WriteLine($"PRODUCER ({_identifier}): Creating {msg}");

            _writer.Add(msg);
        }

        Console.WriteLine($"PRODUCER ({_identifier}): Completed");
    }
}

public class Consumer2
{
    private readonly BlockingCollection<string> _reader;
    private readonly int _identifier;
    private readonly int _delay;

    public Consumer2(BlockingCollection<string> reader, int identifier, int delay)
    {
        _reader = reader;
        _identifier = identifier;
        _delay = delay;
    }

    public void ConsumeData()
    {
        Console.WriteLine($"CONSUMER ({_identifier}): Starting");

        foreach (var timeString in _reader.GetConsumingEnumerable())
        {
            Thread.Sleep(_delay);
            Console.WriteLine($"CONSUMER ({_identifier}): Consuming {timeString}- {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")}");
        }

        Console.WriteLine($"CONSUMER ({_identifier}): Completed");
    }
}

結論

  1. Channel較優,實際上差異極小,以 async/await 寫法來看,Thread之間的時間差,造成效能上的差異最明顯
  2. BlockingCollection如果沒資料則會Block直到有資料才可拿取,Channel如果沒資料則不會Block,而進行結束,得再自行通知 
  3. Channel在生產者第一次時需要0.01秒左右(第一次跟第二次的間隔),後續的每次生產都只需要0.0001~0.0002秒左右
  4. Channel在消費者第一次時需要0.001秒左右(第一次跟第二次的間隔),後續的每次消費都只需要0.0001~0.0004秒左右
  5. BlockingCollection在生者者第一次時需要0.001秒左右(第一次跟第二次的間隔)後續的每次生產都只需要0.0003~0.027秒左右
  6. BlockingCollection在消費者第一次時需要0.0018秒左右(第一次跟第二次的間隔),後續的每次消費都只需要0.0009~0.0018秒左右

資料參考來源

  1. BlockingCollection
  2. Channel
  3. 用 .NET 寫生產者消費者模式的好物 - BlockingCollection
  4. 老王Plus的全栈