The Parallel Programming Of .NET Framework 4.0(4) - Inside Out Of Task Library

圖18中有三個角色,Task Factory負責建立Task物件,Task Scheduler則負責Task的排程事宜。讀者們會覺得很奇怪,至今為止,我們建立Task的方式都是直接以new方式建立,其中並未見到Task Factory的蹤影呀?是的!這是因為Task類別的建構子預設會使用系統所產生的Task Factory物件,所以不需要設計師特別的傳入Task Factory或是明確的使用Task Factory來建立Task,以下是Task類別的模擬碼。

 

The Parallel Programming Of .NET Framework 4.0(4)-Inside Out Of Task Library
 
 
 
/黃忠成
 
Task Factory
 
     讀到這裡,相信讀者們在Task Library使用上已經完全理解了,現在讓我們用一個較高的高度來看Task Library的架構。
 
18
 
18中有三個角色,Task Factory負責建立Task物件,Task Scheduler則負責Task的排程事宜。讀者們會覺得很奇怪,至今為止,我們建立Task的方式都是直接以new方式建立,其中並未見到Task Factory的蹤影呀?是的!這是因為Task類別的建構子預設會使用系統所產生的Task Factory物件,所以不需要設計師特別的傳入Task Factory或是明確的使用Task Factory來建立Task,以下是Task類別的模擬碼。
 
private static TaskFactory s_factory = null;
static Task()
{
   S_factory = new TaskFactory();
}
 
public static TaskFactory
{
 get
 {
    return s_factory;
 }
}
 
Task Factory負責建立Task物件其實並不精確,因為Task物件事實上是獨立建立的,其與Task Factory的關係也只有其內含一個靜態的Task Factory變數,及披露此屬性給設計師存取而已,所以讀者們別把Task FactoryTask的關係想的太過複雜,只要將Task Factory想成是一種Factory Pattern for Task,可以讓設計師透過他來建立Task物件即可,而其與直接new Task物件並無不同,至少在Task Library未開放設計師直接指定Task.Factory屬性值之前是如此,下面是一個使用Task Factory建立Task物件的例子。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task.Factory.StartNew(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Console.WriteLine(total);
                });
            Console.ReadLine();
        }
    }
}
 
很明顯的,此例將TaskTask Factory的關係展露出來,其實Task FactoryTask物件的建立,只是簡化了Task物件的建立及執行動作成單一函式而已,與下例的結果完全一樣。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = new Task(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Console.WriteLine(total);
                });
            t.Start();
            Console.ReadLine();
        }
    }
}
 
相同的寫法,也適用於有傳回值的Task
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t = Task.Factory.StartNew<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    return total;
                });
            Console.WriteLine(t.Result);
            Console.ReadLine();
        }
    }
}
 
或是需要傳入TaskCreationOptions的情況。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task.Factory.StartNew(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Console.WriteLine(total);
                },TaskCreationOptions.PreferFairness);
            Console.ReadLine();
        }
    }
}
 
亦或是傳入CancellationToken
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            CancellationTokenSource cts = new CancellationTokenSource();
            Task.Factory.StartNew(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Console.WriteLine(total);
                },cts.Token);
            Console.ReadLine();
        }
    }
}
 
前面所提及的ContinueWith機制,在Task Factory中也有進階的函式可用,原本的ContinueWith有一個限制,那就是其只能針對單一Task掛載結束時的處理函式,在Task Factory所提供的ContinueWhenAllContinueWhenAny函式中,允許設計師對多個Task掛載結束時的處理函式,下面是一個使用ContinueWhenAll的例子。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t1 = Task.Factory.StartNew<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Thread.Sleep(10000);
                    return total;
                });
 
            Task<int> t2 = Task.Factory.StartNew<int>(() =>
            {
                int total = 0;
                for (int i = 10; i < 20; i++)
                    total += i;
                Thread.Sleep(10000);
                return total;
            });
 
            Task.Factory.ContinueWhenAll(new Task<int>[] { t1, t2 }, (Task<int>[] tasks) =>
                {
                    if (t1.Status == TaskStatus.RanToCompletion &&
                       t1.Status == TaskStatus.RanToCompletion)
                    {
                        Console.WriteLine(tasks[0].Result);
                        Console.WriteLine(tasks[1].Result);
                    }
                });
 
            Console.ReadLine();
        }
    }
}
 
與原本的ContinueWith機制的不同之處在於,當使用ContinueWith時,你只能針對個別的Task來指定結束處理函式,但當使用ContinueWhenAll時,你可以針對多個Task指定單一結束處理函式,而此處理函式會在所傳入的Tasks都結束時才執行。
特別注意一點,雖然ContinueWhenAll允許傳入TaskContinuationOptions,但其只接受下表的值,OnlyRanOnXXXNotOnXXX等值都不允許。
 
 
AttachedToParent
ExecuteSynchronously
LongRunning
PreferFairness
 
比較有趣的是當指定ExecuteSynchronously這個參數值時,原本的意思是要求Task Library將指定的結束處理之Task執行於掛載此結束處理函式的Task所在執行緒中,但現在有多個執行緒,其結果為何?答案是該結束處理之Task將執行於最後結束的Task所在的執行緒中。
ContinueWhenAll相對的函式是ContinueWhenAny,其會在指定的Tasks其中之一結束時執行。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t1 = Task.Factory.StartNew<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Thread.Sleep(12000);
                    return total;
                });
 
            Task<int> t2 = Task.Factory.StartNew<int>(() =>
            {
                int total = 0;
                for (int i = 10; i < 20; i++)
                    total += i;
                Thread.Sleep(10000);
                return total;
            });
            Task.Factory.ContinueWhenAny(new Task<int>[] { t1, t2 }, (Task<int> task) =>
                {
                    if (task.Status == TaskStatus.RanToCompletion)
                    {
                        Console.WriteLine(task.Result);
                    }
                });
 
            Console.ReadLine();
        }
    }
}
 
注意,ContinueWhenAny所指定的函式會在傳入的Tasks中有任何一個結束時執行,且與ContinueWhenAll相同,只會執行一次。
當掛載結束函式的TaskTask<TResult>型別時,必須於呼叫ContinueWhenAllContinueWhenAny時指定TResult型別。
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t1 = Task.Factory.StartNew<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Thread.Sleep(12000);
                    return total;
                });
 
            Task<int> t2 = Task.Factory.StartNew<int>(() =>
            {
                int total = 0;
                for (int i = 10; i < 20; i++)
                    total += i;
                Thread.Sleep(10000);
                return total;
            });
           
            Task tfinal = Task.Factory.ContinueWhenAny<int>(
                         new Task<int>[] { t1, t2 }, (Task<int> task) =>
                {
                    if (task.Status == TaskStatus.RanToCompletion)
                    {
                        Console.WriteLine(task.Result);
                    }
                });
            Console.ReadLine();
        }
    }
}
 
如果需要ContinueWhenAllContinueWhenAny回傳有傳回值的Task<TResult>,寫法如下:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t1 = Task.Factory.StartNew(() =>
            {
                int total = 0;
                for (int i = 0; i < 10; i++)
                    total += i;
                Thread.Sleep(12000);
            });
 
            Task t2 = Task.Factory.StartNew(() =>
            {
                int total = 0;
                for (int i = 10; i < 20; i++)
                    total += i;
                Thread.Sleep(10000);
            });
           
            Task<int> tfinal = Task.Factory.ContinueWhenAll<int>(new Task[] { t1, t2 },
                (Task[] tasks) =>
                {
                    //do something.
                    return 15;
                });
            Console.ReadLine();
        }
    }
}
 
當掛載結束的TaskTask<TResult>型態,且需要ContinueWhenAllContinueWhenAny回傳Task<TResult>型態的Task時,寫法稍有變化:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication12
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<int> t1 = Task.Factory.StartNew<int>(() =>
                {
                    int total = 0;
                    for (int i = 0; i < 10; i++)
                        total += i;
                    Thread.Sleep(12000);
                    return total;
                });
 
            Task<int> t2 = Task.Factory.StartNew<int>(() =>
            {
                int total = 0;
                for (int i = 10; i < 20; i++)
                   total += i;
                Thread.Sleep(10000);
                return total;
            });
           
            Task<int> tfinal = Task.Factory.ContinueWhenAll<int,int>(new Task<int>[] { t1, t2 },
                (Task<int>[] tasks) =>
                {
                    //do something.
                    return 15;
                });
            Console.ReadLine();
        }
    }
}
 
第一個型別參數指的是掛載結束處理函式的Task回傳值型別,第二個型別參數則是ContinueWhenAllContinueWhenAny回傳的Task回傳值型別。
 
 
 
 
Wrapping APM Mode With Task Factory
 
     .NET Framework 2.0 開始支援APM(Asynchronously Programming Model),在常用的IO物件如FileStream、NetworkStream加入了BeginRead/EndRead及BeginWrite/EndWrite 等非同步的讀寫函式,目的是為了讓設計師可以輕易的在等待IO讀寫動作時還能進行其它運算的程式,其實說穿了,APM就是將IO的讀寫動作放到執行緒中,在.NET Framework 2.0時,這個執行緒就是由Thread Pool所提供的。
而Task Library 為了協助設計師將原本的APM程式轉換成Task Library模式,在Task Factory裡提供了FromAsync函式,如下例:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication13
{
    class Program
    {
        static void Main(string[] args)
        {
            FileStream fs = new FileStream(@"C:\Windows\Starter.xml", FileMode.Open, FileAccess.Read);
            FileInfo fi = new FileInfo(@"C:\Windows\Starter.xml");
            byte[] buff = new byte[fi.Length];
            Task<int> t = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, buff, 0,
                             buff.Length, fs);
            t.ContinueWith((Task<int> task) =>
            {
                Console.WriteLine(task.Result);
                ((FileStream)task.AsyncState).Close();
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
            Console.ReadLine();
        }
    }
}
 
FromAsync會回傳一個Task,該Task便是封裝APM的物件,有了這個Task後,設計師就能使用如ContinueWith、Wait、ContinueWhenAll、ContinueWhenAny等函式,得到除了取消機制外的所有Task Library優點。
 
 
 
Task Scheduler
 
     Task Scheduler Task Library中一個很重要的角色,所有的Task物件最後都會交給Task Scheduler來排程,那Task Scheduler與原來的Thread Pool物件有何不同?很簡單,Task Scheduler是舊有Thread Pool物件的加強版,其提供了Task Library中最重要的Local QueueWork Stealing機制,但我們建立Task時也沒有見到此物件的縱影不是嗎?是的,這是因為Task Library內建一個預設的Task Scheduler供設計師使用,如果在建立Task物件時未明確指定Task Scheduler,其便會使用預設的Task Scheduler物件。
 Task Factory的封閉式設計不同,Task Library允許設計師自訂Task Scheduler,例如我們可以設計一個依據 CPU/核心數來排程的Task Scheduler,在單核情況下,該Task Scheduler同時只允許一個Task執行,其它的Task皆處於等待執行狀態,在雙核心情況下,則同時允許兩個Task執行,下例是由MSDN中節錄出來的例子,依據CPU/核心數來排程的自訂Task Scheduler例子。
 
CoreLimitScheduler.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication13
{
    public class CoreLimitSchedular : TaskScheduler
    {
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;
        private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
         // protected by lock(_tasks)
        private readonly int _maxDegreeOfParallelism;
        private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
 
        private static CoreLimitSchedular _scheduler = null;
        private static object _lock = new object();
 
 
        public static TaskScheduler GlobalScheduler
        {
            get
            {
                return _scheduler;
            }
        }
 
        static CoreLimitSchedular()
        {
            _scheduler = new CoreLimitSchedular();
        }
 
        public CoreLimitSchedular()
        {
            _maxDegreeOfParallelism = Environment.ProcessorCount;
        }
 
        protected sealed override void QueueTask(Task task)
        {
            lock (_tasks)
            {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }
 
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                _currentThreadIsProcessingItems = true;
                try
                {
                    // Process all available items in the queue.
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }
 
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }
 
                        base.TryExecuteTask(item);
                    }
                }
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }
 
        protected sealed override bool TryExecuteTaskInline(Task task,
                                 bool taskWasPreviouslyQueued)
        {
            if (!_currentThreadIsProcessingItems) return false;
 
            if (taskWasPreviouslyQueued) TryDequeue(task);
 
            return base.TryExecuteTask(task);
        }
 
        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks) return _tasks.Remove(task);
        }
 
        public sealed override int MaximumConcurrencyLevel {
           get { return _maxDegreeOfParallelism; } }
 
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }
    }
}
Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication13
{
    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Task t1 = new Task(() =>
                {
                    Thread.Sleep(100000);
                });
                t1.Start(CoreLimitSchedular.GlobalScheduler);
            }
            Thread.Sleep(2000);
            Thread.Sleep(2000);
            Thread.Sleep(2000);
        }
    }
}
 
當需要Task使用自訂的Scheduler時,可於建立Task時傳入TaskScheduler物件,當使用Factory.StartNew時也是一樣的。
 
Task.Factory.StartNew(() =>
                    {
                        Thread.Sleep(100000);
                    },                    CancellationToken.None,TaskCreationOptions.None,CoreLimitSchedular.GlobalScheduler);
 
透過Parallel Tasks視窗觀察,你會發現同時只會有與該電腦相同核心數的Tasks被執行,例如在四核電腦上畫面如下圖:
 
19
 
按下F5後到最後一行:
 
20
 
那麼這樣做就會有高效能嗎?答案視你的用途而定,如果Task是大量使用CPU運算資源的話,同時間執行與其相同核心數的Tasks,效能會達到最大化,每超過一個則會持續遞減。但如果Task有一部份是等待IO/Network動作回應的話,那麼就不一定了。
  
 
 
Wrapping QUWI With Task Library(TaskCompleteSource)
 
     APM可以透過FromAsync來達成與Task互通的目的,那麼舊有的QUWI也可以嗎?當然,Task Library也提供了將QUWI封裝成Task的機制:TaskCompletionSource,如下例:
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
 
namespace ConsoleApplication13
{
    class Program
    {
        static void Main(string[] args)
        {
            TaskCompletionSource<byte[]> tcs = new TaskCompletionSource<byte[]>();
            ThreadPool.QueueUserWorkItem((state) =>
            {
                FileStream fs = new FileStream(@"C:\Windows\Starter.xml", FileMode.Open,
                       FileAccess.Read);
                byte[] buff = new byte[fs.Length];
                fs.Read(buff, 0, (int)fs.Length);
                tcs.SetResult(buff);
                fs.Close();
            });
 
            tcs.Task.ContinueWith((Task<byte[]> task) =>
            {
                Console.WriteLine(task.Result.Length);
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
            Console.ReadLine();
        }
    }
}
 
重點只有一個,就是建立TaskCompletionSource物件,並於QUWI中使用它來設定如回傳值、結束狀態等等,當呼叫SetResult時,就意味著此TaskCompletionSource所繫結的Task已經正常結束,其使用ContinueWith掛載的正常結束函式便會執行。當呼叫了SetExceptionSetCancel時, 則意味著TaskCompletionSource所繫結的Task未正常結束,其使用ContinueWith掛載的RunOnCancelRunOnFaulted函式會被執行。
那麼可以使用Wait、或是透過Task.Result來取值嗎?可以的,當呼叫Wait或是透過Task.Result取值時,其會等待到QUWI呼叫了SetResultSetExceptionSetCancel為止,當QUWI呼叫的是SetExceptionSetCancel時,也會拋出例外,一切與使用一般Task無異。