Task Library除了支援Planed/Un plan Exit時的例外處理,及Local Queue、Working Stealing機制外,還有一項很有趣的機制,那就是Continue With機制,這個機制允許設計師在一個執行緒結束後,緊接著安排另一個執行緒來執行指定的delegate,以較簡單、白話的說,就是執行緒的流程控管機制。
The Parallel Programming Of .NET Framework 4.0(3) - Deep Into Task Library
文/黃忠成
ContinueWith
Task Library除了支援Planed/Un plan Exit時的例外處理,及Local Queue、Working Stealing機制外,還有一項很有趣的機制,那就是Continue With機制,這個機制允許設計師在一個執行緒結束後,緊接著安排另一個執行緒來執行指定的delegate,以較簡單、白話的說,就是執行緒的流程控管機制。
圖16
Continue With機制,允許設計師於執行緒結束的時候,依據結束的狀態,分別指定特定的delegate,這些delegate會在特定狀態時被排入排程,以另一個執行緒來執行。
就圖16來說,我們分別排了三個delegate,在Task於Faulted(有例外發生,而未被正常補捉)、Complete(執行緒正常結束)、Cancel(執行緒被取消)時執行,請注意!只有在狀態符合時,指定的delegate才會被執行。下面是一個使用ContinueWith的小例子。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); }); t.ContinueWith((task) => { Console.WriteLine("Finish"); }); t.Start(); Console.ReadLine(); } } } |
呼叫ContinueWith而未帶任何參數時,預設為None模式,也就是不管Task發生例外或是取消、還是正常結束,都會執行指定的delegate,設計師如使用此法呼叫ContinueWith,通常必須自己判斷Task的結束狀態(Task.Status),這也是為何ContinueWith接受的delegate必須傳入Task物件的原因。
如果需要於不同狀態來執行不同的delegate,可於呼叫ContinueWith時傳入TaskContinuationOptions參數。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); }); t.ContinueWith((task) => { Console.WriteLine("Finish"); },TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith((task) => { Console.WriteLine("Canceled"); }, TaskContinuationOptions.OnlyOnCanceled); t.ContinueWith((task) => { Console.WriteLine("Faulted"); }, TaskContinuationOptions.OnlyOnFaulted); t.Start(); Console.ReadLine(); } } } |
因為ContinueWith是於特定狀態時,將delegate排入執行緒排程來執行,所以其會回傳一個Task物件,如果需要的話,也可以使用串接的方式來串接Task。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); }); t.ContinueWith((task) => { Console.WriteLine("Finish"); },TaskContinuationOptions.OnlyOnRanToCompletion).ContinueWith((task)=> { Console.WriteLine("chain continue with."); },TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith((task) => { Console.WriteLine("Canceled"); }, TaskContinuationOptions.OnlyOnCanceled); t.ContinueWith((task) => { Console.WriteLine("Faulted"); }, TaskContinuationOptions.OnlyOnFaulted); t.Start(); Console.ReadLine(); } } } |
執行結果如下:
Finish chain continue with. |
下表為TaskContinuationOptions可能的值,分成狀態及執行ContinueWith指定之delegate時Task的CreationOptions兩類,這兩類值可以以OR方式合併使用的。
值 | 說明 |
Task CreationOptions | |
AttachedToParent | 此ContinueWith所指定的delegate,會附加到外層的Task上,請注意!不是附加到呼叫ContinueWith的Task,而是外層。 |
ExecuteSynchronously | 此ContinueWith所指定的delegate,會執行於該Task相同的執行緒中,也就是說: t.ContinueWith( (task)=> { //此處將執行於t這個Task所產生的 //執行緒中 } , TaskContinuationOptions.ExecuteSynchronously ); PS:此值不可與LongRunning合用。 |
LongRunning | 此ContinueWith所指定的delegate,不會排入Pool排程,而是直接建立一個Thread執行。 |
PreferFairness | 此ContinueWith所指定的delegate,將被排入Global Queue。 |
Task Exit Status | |
None | 預設值,不論執行緒結束時狀態,一律執行指定的delegate,並且未指定上述的四種Task Creation Options,也就是說此delegate將以一般型態被排入Local Queue執行。 |
NotOnCanceled | 如果執行緒不是以Canceled結束,執行指定的delegate,情況包含:正常結束、Faulted結束。 |
NotOnFaulted | 如果執行緒不是以Faulted結束,執行指定的delegate,情況包含:正常結束、Cancel結束。 |
NotOnRanToCompletion | 如果執行緒不是正常結束,執行指定的delegate,情況包含:Faulted、Cancel結束。 |
OnlyOnCanceled | 當執行緒以Cancel狀態結束時,執行指定的delegate。 |
OnlyOnFaulted | 當執行緒以Faulted狀態結束時,執行指定的delegate。 |
OnlyOnRanToCompletion | 當執行緒以正常狀態結束時,執行指定的delegate。 |
以OnlyXXX為參數的模式很容易理解,就是在特定狀態下執行delegate,NotXXX的模式就是非在指定狀態下執行delegate,見下面的例子。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); }); t.ContinueWith((task) => { Console.WriteLine("Finish"); }, TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith((task) => { Console.WriteLine("Error"); }, TaskContinuationOptions.NotOnRanToCompletion); t.Start(); Console.ReadLine(); } } } |
這個例子於正常結束時印出Finish,非正常結束如Cancel、Faulted時印出Error。你可以將NotXXX以OR方式連結起來,例如NotOnFaulted|NotOnCanceled就會變成OnlyRanToCompletion,NotOnFaulted|NotOnRanToCompletion就會變成OnlyOnCanceled,但是請注意一點,你不能將OnlyXXX以OR方式連起來,OnlyOnCanceled|OnlyOnFaulted不等於NotOnRanToCompletion。
必要時,你也可以透過以OR串接Task Creation Options來指定ContinueWith所產生的Task排程模式,下例會將ContinueWith所指定的Task排入Global Queue。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); throw new Exception("TEST"); }); t.ContinueWith((task) => { Console.WriteLine("Finish"); }, TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith((task) => { Console.WriteLine("Error"); },TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.PreferFairness ); t.Start(); Console.ReadLine(); } } } |
因為ContinueWith內部也是使用Task,所以你也能指定CancellationToke給它,處理ContinueWith所指定之delegate的Cancel狀態。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task t = new Task(() => { for (int i = 0; i < 5; i++) Thread.Sleep(500); }); CancellationTokenSource cts = new CancellationTokenSource(); t.ContinueWith((task) => { Thread.Sleep(3000); Console.WriteLine("Finish"); cts.Token.ThrowIfCancellationRequested(); },cts.Token).ContinueWith((task)=> { Console.WriteLine("Cancel on Finish"); },TaskContinuationOptions.OnlyOnCanceled); ; t.Start(); Thread.Sleep(3000); cts.Cancel(); Console.ReadLine(); } } } |
如果是使用有回傳值的Task,那麼就必須指定delegate的參數。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t = new Task<int>(() => { int v = 0; for (int i = 0; i < 5; i++) { Thread.Sleep(500); v += i; } return v; }); t.ContinueWith((Task<int> task) => { Thread.Sleep(3000); Console.WriteLine(task.Result); Console.WriteLine("Finish"); }); t.Start(); Console.ReadLine(); } } } |
ContinueWith內部是Task,所以我們自然也能指定其產生有傳回值的Task<TResult>物件。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { Task<int> t = new Task<int>(() => { int v = 0; for (int i = 0; i < 5; i++) { Thread.Sleep(500); v += i; } return v; }); Task<int> resultTask = t.ContinueWith<int>((Task<int> task) => { Thread.Sleep(3000); Console.WriteLine(task.Result); Console.WriteLine("Finish"); return task.Result + 1000; }); t.Start(); Console.WriteLine(resultTask.Result); Console.ReadLine(); } } } |
ExecuteSynchronously
TaskContinuationOptions有個很有趣的值:ExecuteSynchronously,當你於ContinueWith時使用此值時,該ContinueWith所產生的Task將會執行在其所附加的Task所使用的執行緒中,例如下例:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Threading; using System.Threading.Tasks; namespace WindowsFormsApplication3 { public partial class Form1 : Form { public Form1() { InitializeComponent(); } private void button1_Click(object sender, EventArgs e) { Task<int> t = new Task<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; MessageBox.Show(Thread.CurrentThread.ManagedThreadId.ToString()); return total; }); t.ContinueWith((Task<int> task) => { MessageBox.Show(Thread.CurrentThread.ManagedThreadId.ToString()); },TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); t.Start(); } } } |
執行此例時,你會發現兩個Task所列出的ManagedThreadId是相同的數字,拿掉ExecuteSynchronously後重新執行,這兩個Task的ManagedThreadId將可能完全不同,為何用【可能】這個字眼呢?因為依據Task Library排程的情況不同,運氣好的話這兩個Task會在同一個執行緒(因為Thread Pool重用已結束但未回收執行緒的行為),但運氣不好的話,就不會在同一個執行緒中,當然!寫程式需要的是精確度,不是比誰運氣好,所以當你需要ContinueWith所產生的Task執行在其附加的Task所在執行緒時,請加上ExecuteSynchronously。
那何時會用到這個機制呢?最常用到的是Thread Static變數,見下例:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Threading; using System.Threading.Tasks; namespace WindowsFormsApplication3 { public partial class Form1 : Form { public Form1() { InitializeComponent(); } private void button1_Click(object sender, EventArgs e) { Task<int> t = new Task<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; ThreadLocalVariable.Data = total; return total; }); t.ContinueWith((Task<int> task) => { MessageBox.Show(ThreadLocalVariable.Data.ToString()); }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously ); t.Start(); } } public class ThreadLocalVariable { [ThreadStatic] public static int Data = 0; } } |
此例中將ThreadLocalVariable.Data設為Thread Static形態,要求CLR將此變數視為是每個執行緒都有一份,也就是說當有10個執行緒設定此變數值,那麼每個執行緒都會擁有一份自己的值,A執行緒對ThreadLocalVariable.Data 的變更,不會反應在B執行緒上。此例於t這個Task中設定了Data的值,接著於ContinueWith的Task中列出其值,假如ContinueWith的Task未執行在t這個Task所執行的執行緒中,Data的值將會維持在零。讀者們可觀察ExecuteSynchronously拿掉前後的差異,就能了解此值存在的意義,
Why ContinueWith??
乍看之下,ContinueWith似乎很好用,但如果細想一下,你會開始懷疑這個機制存在的必要性,因為就算不使用ContinueWith,我們似乎也能得到同樣的結果。
static void Main(string[] args) { Task<int> t = new Task<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(5000); Console.WriteLine("complete"); return total; }); t.Start(); Console.WriteLine(t.Result); Console.ReadLine(); } |
這個例子的執行結果與下面這個使用ContinueWith的結果幾近相同,差別只在於【complete】列出的順序而已。
static void Main(string[] args) { Task<int> t = new Task<int>(() => { int total = 0; for (int i = 0; i < 10; i++) total += i; Thread.Sleep(5000); return total; }); t.ContinueWith((Task<int> task) => { Console.WriteLine("complete"); }, TaskContinuationOptions.OnlyOnRanToCompletion); t.Start(); Console.WriteLine(t.Result); Console.ReadLine(); } |
這兩個例子披露了一個事實,ContinueWith存在的意義似乎不大,其所能達到的效果,我們直接在Task中也能夠以try...catch來完成,也能用cts.Token. IsCancellationRequested來處理取消,那為何要設計ContinueWith這種機制呢?答案是動態及靜態的不同,在未使用ContinueWith的例子中,我們是將結束時處理的程式碼寫在Task的delegate中,此為靜態,這在執行時期是不可變的,但是當使用ContinueWith後,結束時處理的程式碼是【附加】到Task上的,其在執行時期是可變的,以一個模擬的例子來說,假設我們有一個Class Library,其將訂單的處理機制寫在Task中,由外部來呼叫執行,如下所示:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Data.SqlClient; namespace ClassLibrary1 { public class Class1 { public static Task UpdateOrders(SqlConnection conn) { Task t = new Task( ()=> { //updating order.. Thread.Sleep(10000); }); return t; } } } |
呼叫者以下列程式碼執行:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Data.SqlClient; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { SqlConnection conn = new SqlConnection(); Task t = ClassLibrary1.Class1.UpdateOrders(conn); t.Start(); t.Wait(); Console.ReadLine(); } } } |
OK,看來很正常沒有任何問題,倘若今天需要在更新訂單後記錄此次更新於另一資料表,按原本的寫法,我們可以在t.Wait後寫下記錄的程式碼,但這麼做就失去了非同步寫法的優勢:
Task t = ClassLibrary1.Class1.UpdateOrders(conn); t.Start(); t.Wait(); //log order |
要維持非同步的優勢,我們有兩種選擇,一是修改UpdateOrders的原始碼,加入記錄的程式碼,二是將UpdateOrders包在另一個Task中,如下所示:
Task t_wrapper = new Task( ()=> { Task t = ClassLibrary1.Class1.UpdateOrders(conn); t.Start(); t.Wait(); //log order }); |
修改UpdateOrders原始碼是比較直接的寫法,但重點是你得有該Class Library的原始碼,Wrapper的寫法也不錯,但缺點是無法保證這兩個Task是跑在同一個執行緒中,此時ContinueWith的優勢就出現了。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Data.SqlClient; namespace ConsoleApplication12 { class Program { static void Main(string[] args) { SqlConnection conn = new SqlConnection(); Task t = ClassLibrary1.Class1.UpdateOrders(conn); t.ContinueWith((task) => { //updating order log. Thread.Sleep(3000); Console.WriteLine("order loged."); }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); t.Start(); t.Wait(); Console.ReadLine(); } } } |
重點就是,當使用ContinueWith時,你附加的動作是完全動態的,這比起原先靜態的寫法靈活許多。