[C#][.NET]Concurrent collections

Concurrent collections

collection的操作通常都是非執行緒安全的,例如List<T>。  意思是說當會一個集合進行多執行緒操作的時候,會造成不可預期的情況,例如資料遺漏、索引重複等等...

如以下範例:


public void Run() 
{
    var products = GetProducts();
    List<NewProduct> newProducts = new List<NewProduct>();

    Console.WriteLine("來源資料筆數:{0}", products.Count());
    Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count());

    Parallel.ForEach(products, x =>
    {
        int id = x.Id;
        string name = x.Name;
        int stock = 100 + x.Id;

        var newProduct = new NewProduct()
        {
            Id = id,
            Name = name,
            Stock = stock
        };

        newProducts.Add(newProduct);
    });

    Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count());            
}

private IEnumerable<Product> GetProducts() 
{            
    for (int i = 0; i < 100000; i++)
    {
        yield return new Product() { Id = i, Name = "Miles" + i };
    }
}

結果:

很明顯,計算後的資料遺失了21筆,這不是我們要的結果。

 

Concurrent collections

.NET Framework提供了一些執行緒安全的collection,可使用在多執行緒的環境底下。

  • BlockingCollection<T>
    • 可設定Collection最高上限
  • ConcurrentBag<T>
    • 無順序性的集合
  • ConcurrentDicitionary<TKey,T>
    • Key,Value的集合
  • ConcurrentQueue<T>
    • Queue集合的概念 first in, first out
  • ConcurrentStack<T>
    • Stack集合的概念, last in, first out

 

.NET提供了以上五種執行緒安全的collection,用法大同小異,只是呼叫的api不同而已。

用法跟一般List也差不多,這邊以ConcurrentBag作範例:


public void Run() 
{
    var products = GetProducts();
    ConcurrentBag<NewProduct> newProducts = new ConcurrentBag<NewProduct>();

    Console.WriteLine("來源資料筆數:{0}", products.Count());
    Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count());

    Parallel.ForEach(products, x =>
    {
        int id = x.Id;
        string name = x.Name;
        int stock = 100 + x.Id;

        var newProduct = new NewProduct()
        {
            Id = id,
            Name = name,
            Stock = stock
        };
        newProducts.Add(newProduct);
    });

    Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count());
}

private IEnumerable<Product> GetProducts() 
{            
    for (int i = 0; i < 100000; i++)
    {
        yield return new Product() { Id = i, Name = "Miles" + i };
    }
}

其實只是把List<T>改成ConcurrentBag<T>而已,就達成了執行緒安全的操作。

結果:

執行結果如預期,沒有遺漏。

 

因為Concurrent collections實作IEnumerable的關係,所以支援Entity Framework,可將結果的collection直接用AddRange的方式Insert,這個對資料有大量運算,且又要跟資料庫互動方面,真的是還蠻方便的。

範例如下:

public void Run()
{
    LinqDemoEntities db = new LinqDemoEntities();
    var products = GetProducts();
    ConcurrentBag<Order> cb = new ConcurrentBag<Order>();

    Console.WriteLine("來源資料筆數:{0}", products.Count());
    Console.WriteLine("執行前結果資料筆數:{0}", db.Order.Count());

    Parallel.ForEach(products, x =>
    {
        int id = x.Id;
        string name = x.Name;
        int stock = 100 + x.Id;

        var order = new Order()
        {
            Id = id,
            OrderName = name
        };

        cb.Add(order);
    });

    db.Order.AddRange(cb);

    db.SaveChanges();

    Console.WriteLine("執行後結果資料筆數:{0}", db.Order.Count());
}

private IEnumerable<Product> GetProducts() 
{            
    for (int i = 0; i < 1000; i++)
    {
        yield return new Product() { Id = i, Name = "Miles" + i };
    }
}

結果:

小節:

使用多執行緒運算一定要使用concurrent collections,這樣才不會對資料上產生非預期的影響。

 

 

 

一天一分享,身體好健康。

該追究的不是過去的原因,而是現在的目的。