圖18中有三個角色,Task Factory負責建立Task物件,Task Scheduler則負責Task的排程事宜。讀者們會覺得很奇怪,至今為止,我們建立Task的方式都是直接以new方式建立,其中並未見到Task Factory的蹤影呀?是的!這是因為Task類別的建構子預設會使用系統所產生的Task Factory物件,所以不需要設計師特別的傳入Task Factory或是明確的使用Task Factory來建立Task,以下是Task類別的模擬碼。
The Parallel Programming Of .NET Framework 4.0(4)-Inside Out Of Task Library
文/黃忠成
Task Factory
讀到這裡,相信讀者們在Task Library使用上已經完全理解了,現在讓我們用一個較高的高度來看Task Library的架構。
圖18
圖18中有三個角色,Task Factory負責建立Task物件,Task Scheduler則負責Task的排程事宜。讀者們會覺得很奇怪,至今為止,我們建立Task的方式都是直接以new方式建立,其中並未見到Task Factory的蹤影呀?是的!這是因為Task類別的建構子預設會使用系統所產生的Task Factory物件,所以不需要設計師特別的傳入Task Factory或是明確的使用Task Factory來建立Task,以下是Task類別的模擬碼。
private static TaskFactory s_factory = null; static Task() { S_factory = new TaskFactory(); } public static TaskFactory { get { return s_factory; } } |
說Task Factory負責建立Task物件其實並不精確,因為Task物件事實上是獨立建立的,其與Task Factory的關係也只有其內含一個靜態的Task Factory變數,及披露此屬性給設計師存取而已,所以讀者們別把Task Factory及Task的關係想的太過複雜,只要將Task Factory想成是一種Factory Pattern for Task,可以讓設計師透過他來建立Task物件即可,而其與直接new Task物件並無不同,至少在Task Library未開放設計師直接指定Task.Factory屬性值之前是如此,下面是一個使用Task Factory建立Task物件的例子。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task.Factory.StartNew(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Console.WriteLine(total); }); Console.ReadLine(); } } } |
很明顯的,此例將Task與Task Factory的關係展露出來,其實Task Factory於Task物件的建立,只是簡化了Task物件的建立及執行動作成單一函式而已,與下例的結果完全一樣。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Console.WriteLine(total); }); t.Start(); Console.ReadLine(); } } } |
相同的寫法,也適用於有傳回值的Task,
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; return total; }); Console.WriteLine(t.Result); Console.ReadLine(); } } } |
或是需要傳入TaskCreationOptions的情況。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task.Factory.StartNew(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Console.WriteLine(total); },TaskCreationOptions.PreferFairness); Console.ReadLine(); } } } |
亦或是傳入CancellationToken。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { CancellationTokenSource cts = new CancellationTokenSource(); Task.Factory.StartNew(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Console.WriteLine(total); },cts.Token); Console.ReadLine(); } } } |
前面所提及的ContinueWith機制,在Task Factory中也有進階的函式可用,原本的ContinueWith有一個限制,那就是其只能針對單一Task掛載結束時的處理函式,在Task Factory所提供的ContinueWhenAll、ContinueWhenAny函式中,允許設計師對多個Task掛載結束時的處理函式,下面是一個使用ContinueWhenAll的例子。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t1 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(10000); return total; }); Task<int> t2 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 10; i < 20; i++) total += i; Thread.Sleep(10000); return total; }); Task.Factory.ContinueWhenAll(new Task<int>[] { t1, t2 }, (Task<int>[] tasks) => { if (t1.Status == TaskStatus.RanToCompletion && t1.Status == TaskStatus.RanToCompletion) { Console.WriteLine(tasks[0].Result); Console.WriteLine(tasks[1].Result); } }); Console.ReadLine(); } } } |
與原本的ContinueWith機制的不同之處在於,當使用ContinueWith時,你只能針對個別的Task來指定結束處理函式,但當使用ContinueWhenAll時,你可以針對多個Task指定單一結束處理函式,而此處理函式會在所傳入的Tasks都結束時才執行。
特別注意一點,雖然ContinueWhenAll允許傳入TaskContinuationOptions,但其只接受下表的值,OnlyRanOnXXX、NotOnXXX等值都不允許。
值 |
AttachedToParent |
ExecuteSynchronously |
LongRunning |
PreferFairness |
比較有趣的是當指定ExecuteSynchronously這個參數值時,原本的意思是要求Task Library將指定的結束處理之Task執行於掛載此結束處理函式的Task所在執行緒中,但現在有多個執行緒,其結果為何?答案是該結束處理之Task將執行於最後結束的Task所在的執行緒中。
與ContinueWhenAll相對的函式是ContinueWhenAny,其會在指定的Tasks其中之一結束時執行。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t1 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(12000); return total; }); Task<int> t2 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 10; i < 20; i++) total += i; Thread.Sleep(10000); return total; }); Task.Factory.ContinueWhenAny(new Task<int>[] { t1, t2 }, (Task<int> task) => { if (task.Status == TaskStatus.RanToCompletion) { Console.WriteLine(task.Result); } }); Console.ReadLine(); } } } |
注意,ContinueWhenAny所指定的函式會在傳入的Tasks中有任何一個結束時執行,且與ContinueWhenAll相同,只會執行一次。
當掛載結束函式的Task是Task<TResult>型別時,必須於呼叫ContinueWhenAll、ContinueWhenAny時指定TResult型別。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t1 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(12000); return total; }); Task<int> t2 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 10; i < 20; i++) total += i; Thread.Sleep(10000); return total; }); Task tfinal = Task.Factory.ContinueWhenAny<int>( new Task<int>[] { t1, t2 }, (Task<int> task) => { if (task.Status == TaskStatus.RanToCompletion) { Console.WriteLine(task.Result); } }); Console.ReadLine(); } } } |
如果需要ContinueWhenAll、ContinueWhenAny回傳有傳回值的Task<TResult>,寫法如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t1 = Task.Factory.StartNew(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(12000); }); Task t2 = Task.Factory.StartNew(() => { int total = 0; for (int i = 10; i < 20; i++) total += i; Thread.Sleep(10000); }); Task<int> tfinal = Task.Factory.ContinueWhenAll<int>(new Task[] { t1, t2 }, (Task[] tasks) => { //do something. return 15; }); Console.ReadLine(); } } } |
當掛載結束的Task是Task<TResult>型態,且需要ContinueWhenAll、ContinueWhenAny回傳Task<TResult>型態的Task時,寫法稍有變化:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t1 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(12000); return total; }); Task<int> t2 = Task.Factory.StartNew<int>(() => { int total = 0; for (int i = 10; i < 20; i++) total += i; Thread.Sleep(10000); return total; }); Task<int> tfinal = Task.Factory.ContinueWhenAll<int,int>(new Task<int>[] { t1, t2 }, (Task<int>[] tasks) => { //do something. return 15; }); Console.ReadLine(); } } } |
第一個型別參數指的是掛載結束處理函式的Task回傳值型別,第二個型別參數則是ContinueWhenAll、ContinueWhenAny回傳的Task回傳值型別。
Wrapping APM Mode With Task Factory
.NET Framework 2.0 開始支援APM(Asynchronously Programming Model),在常用的IO物件如FileStream、NetworkStream加入了BeginRead/EndRead及BeginWrite/EndWrite 等非同步的讀寫函式,目的是為了讓設計師可以輕易的在等待IO讀寫動作時還能進行其它運算的程式,其實說穿了,APM就是將IO的讀寫動作放到執行緒中,在.NET Framework 2.0時,這個執行緒就是由Thread Pool所提供的。
而Task Library 為了協助設計師將原本的APM程式轉換成Task Library模式,在Task Factory裡提供了FromAsync函式,如下例:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { static void Main(string[] args) { FileStream fs = new FileStream(@"C:\Windows\Starter.xml", FileMode.Open, FileAccess.Read); FileInfo fi = new FileInfo(@"C:\Windows\Starter.xml"); byte[] buff = new byte[fi.Length]; Task<int> t = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, buff, 0, buff.Length, fs); t.ContinueWith((Task<int> task) => { Console.WriteLine(task.Result); ((FileStream)task.AsyncState).Close(); }, TaskContinuationOptions.OnlyOnRanToCompletion); Console.ReadLine(); } } } |
FromAsync會回傳一個Task,該Task便是封裝APM的物件,有了這個Task後,設計師就能使用如ContinueWith、Wait、ContinueWhenAll、ContinueWhenAny等函式,得到除了取消機制外的所有Task Library優點。
Task Scheduler
Task Scheduler 是Task Library中一個很重要的角色,所有的Task物件最後都會交給Task Scheduler來排程,那Task Scheduler與原來的Thread Pool物件有何不同?很簡單,Task Scheduler是舊有Thread Pool物件的加強版,其提供了Task Library中最重要的Local Queue、Work Stealing機制,但我們建立Task時也沒有見到此物件的縱影不是嗎?是的,這是因為Task Library內建一個預設的Task Scheduler供設計師使用,如果在建立Task物件時未明確指定Task Scheduler,其便會使用預設的Task Scheduler物件。
與Task Factory的封閉式設計不同,Task Library允許設計師自訂Task Scheduler,例如我們可以設計一個依據 CPU/核心數來排程的Task Scheduler,在單核情況下,該Task Scheduler同時只允許一個Task執行,其它的Task皆處於等待執行狀態,在雙核心情況下,則同時允許兩個Task執行,下例是由MSDN中節錄出來的例子,依據CPU/核心數來排程的自訂Task Scheduler例子。
CoreLimitScheduler.cs |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { public class CoreLimitSchedular : TaskScheduler { [ThreadStatic] private static bool _currentThreadIsProcessingItems; private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) private readonly int _maxDegreeOfParallelism; private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) private static CoreLimitSchedular _scheduler = null; private static object _lock = new object(); public static TaskScheduler GlobalScheduler { get { return _scheduler; } } static CoreLimitSchedular() { _scheduler = new CoreLimitSchedular(); } public CoreLimitSchedular() { _maxDegreeOfParallelism = Environment.ProcessorCount; } protected sealed override void QueueTask(Task task) { lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } item = _tasks.First.Value; _tasks.RemoveFirst(); } base.TryExecuteTask(item); } } finally { _currentThreadIsProcessingItems = false; } }, null); } protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { if (!_currentThreadIsProcessingItems) return false; if (taskWasPreviouslyQueued) TryDequeue(task); return base.TryExecuteTask(task); } protected sealed override bool TryDequeue(Task task) { lock (_tasks) return _tasks.Remove(task); } public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } protected sealed override IEnumerable<Task> GetScheduledTasks() { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } } } |
Program.cs |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { static void Main(string[] args) { for (int i = 0; i < 10; i++) { Task t1 = new Task(() => { Thread.Sleep(100000); }); t1.Start(CoreLimitSchedular.GlobalScheduler); } Thread.Sleep(2000); Thread.Sleep(2000); Thread.Sleep(2000); } } } |
當需要Task使用自訂的Scheduler時,可於建立Task時傳入TaskScheduler物件,當使用Factory.StartNew時也是一樣的。
Task.Factory.StartNew(() => { Thread.Sleep(100000); }, CancellationToken.None,TaskCreationOptions.None,CoreLimitSchedular.GlobalScheduler); |
透過Parallel Tasks視窗觀察,你會發現同時只會有與該電腦相同核心數的Tasks被執行,例如在四核電腦上畫面如下圖:
圖19
按下F5後到最後一行:
圖20
那麼這樣做就會有高效能嗎?答案視你的用途而定,如果Task是大量使用CPU運算資源的話,同時間執行與其相同核心數的Tasks,效能會達到最大化,每超過一個則會持續遞減。但如果Task有一部份是等待IO/Network動作回應的話,那麼就不一定了。
Wrapping QUWI With Task Library(TaskCompleteSource)
APM可以透過FromAsync來達成與Task互通的目的,那麼舊有的QUWI也可以嗎?當然,Task Library也提供了將QUWI封裝成Task的機制:TaskCompletionSource,如下例:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { static void Main(string[] args) { TaskCompletionSource<byte[]> tcs = new TaskCompletionSource<byte[]>(); ThreadPool.QueueUserWorkItem((state) => { FileStream fs = new FileStream(@"C:\Windows\Starter.xml", FileMode.Open, FileAccess.Read); byte[] buff = new byte[fs.Length]; fs.Read(buff, 0, (int)fs.Length); tcs.SetResult(buff); fs.Close(); }); tcs.Task.ContinueWith((Task<byte[]> task) => { Console.WriteLine(task.Result.Length); }, TaskContinuationOptions.OnlyOnRanToCompletion); Console.ReadLine(); } } } |
重點只有一個,就是建立TaskCompletionSource物件,並於QUWI中使用它來設定如回傳值、結束狀態等等,當呼叫SetResult時,就意味著此TaskCompletionSource所繫結的Task已經正常結束,其使用ContinueWith掛載的正常結束函式便會執行。當呼叫了SetException、SetCancel時, 則意味著TaskCompletionSource所繫結的Task未正常結束,其使用ContinueWith掛載的RunOnCancel、RunOnFaulted函式會被執行。
那麼可以使用Wait、或是透過Task.Result來取值嗎?可以的,當呼叫Wait或是透過Task.Result取值時,其會等待到QUWI呼叫了SetResult、SetException、SetCancel為止,當QUWI呼叫的是SetException、SetCancel時,也會拋出例外,一切與使用一般Task無異。