在CPU進入多核心時代後,原本只限應用於高階多CPU電腦的平行運算技術,也因為多核心的平價化而逐漸浮現在家用電腦應用,什麼是平行運算呢?說穿了其實很簡單,就是依據CPU所內含的核心數,建立對應數量的執行緒,此時CPU的效能會發揮到極致,以2核心CPU為例,
The Parallel Programming Of .NET Framework 4.0(5)-Dive to Parallel Programming
文/黃忠成
Dive to Parallel Programming
在CPU進入多核心時代後,原本只限應用於高階多CPU電腦的平行運算技術,也因為多核心的平價化而逐漸浮現在家用電腦應用,什麼是平行運算呢?說穿了其實很簡單,就是依據CPU所內含的核心數,建立對應數量的執行緒,此時CPU的效能會發揮到極致,以2核心CPU為例,當程式只用到單一執行緒時,其最大CPU佔用率只會到達50%,只有在程式用到兩個執行緒以上時,CPU佔用率才有可能達到100%,這是為何有些時候你換了多核心電腦,但程式執行效能卻未有顯著提升的主因,因為大多數程式最初都是以單執行緒為基準設計的,核心數的提升並不會增加其效能,只有時脈提升才能得到明顯的效能提升。
那在現在這種情況下,多核心所帶來的效能提升幾近於零嗎?也不見得,近年來的多工型OS對此皆有最佳化,會將CPU時間平均分給所有程式,因此當A程式佔用了50%的CPU時間時(以2核為例),OS會將其它的50%分給其它程式,因此多核心可以讓你的電腦在同時執行多個程式時獲得更高效能,舉個實例來說,當你需要壓縮兩個目錄而同時執行兩個WinZip時,原本的單核心只能平均將CPU時間分給這兩個WinZip,兩個壓縮動作是以交錯進行的,但在雙核心時,這兩個壓縮動作將可以同時執行,一個壓縮動作需要5分鐘完成的話,兩個壓縮動作在單核心可能會耗費10分鐘已上,而在雙核心就只會耗費5~6分鐘左右,這是多核心帶來的效能提升,但除非壓縮程式依據多核心重新設計,否則不會發生單一壓縮動作因多核心而提升至2-3分鐘完成的現象。
那程式依據多核心設計,其效能會依據核心數成倍數成長嗎?不見得,多核心與多CPU不同,多CPU會有多個運算接收管道,但多核心只會有單一運算接收管道,就像是高速公路有6線道,但匝道只有單線,車輛要高速行駛前得先排隊進入6 線道才行,因此多核心效率的提升不會是倍數,多一個核心大約只帶來70%的效能提升。
空口無憑,讓我們以一個實例來實驗,這個例子會事先產生5百萬的元素,然後在其中尋找單一元素。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { private static Person Searching_1core(List<Person> datas) { Person resultItem = null; for (int i = 0; i < datas.Count; i++) { var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } return resultItem; } static void Main(string[] args) { Console.WriteLine("Generating Data."); var datas = Person.GenerateBigTable(); Console.WriteLine("Starting Searching."); Stopwatch sw = new Stopwatch(); sw.Start(); Person result = Searching_1core(datas); sw.Stop(); Console.WriteLine("elapsed {0} ms", sw.ElapsedMilliseconds); Console.ReadLine(); } } public class Person { public string Name { get; set; } public string Age { get; set; } public string Address { get; set; } public string Company { get; set; } public string Title { get; set; } public static List<Person> GenerateBigTable() { List<Person> ps = new List<Person>(); for (int i = 0; i < 5000000; i++) ps.Add(new Person() { Name = "c64" + i.ToString(), Age = i.ToString(), Address = "Taipei " + i.ToString(), Title = "Title" + i.ToString(), Company = "GIS " + i.ToString() }); return ps; } } } |
此例執行結果如下:
Generating Data. Starting Searching. elapsed 2361 ms |
當在雙核心電腦中,我們可以利用其核心優勢,以兩個執行緒來進行搜尋,原理是將500萬元素分割成兩塊各250萬,交給兩個執行緒搜尋。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { private static Person Searching_2core(List<Person> datas) { Person resultItem = null; Task t1 = new Task(() => { for (int i = 0; i < 2500000; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); Task t2 = new Task(() => { for (int i = 2500000; i < datas.Count; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); t1.Start(); t2.Start(); t1.Wait(); t2.Wait(); return resultItem; } static void Main(string[] args) { Console.WriteLine("Generating Data."); var datas = Person.GenerateBigTable(); Console.WriteLine("Starting Searching."); Stopwatch sw = new Stopwatch(); sw.Start(); Person result = Searching_2core(datas); sw.Stop(); Console.WriteLine("elapsed {0} ms", sw.ElapsedMilliseconds); Console.ReadLine(); } } public class Person { public string Name { get; set; } public string Age { get; set; } public string Address { get; set; } public string Company { get; set; } public string Title { get; set; } public static List<Person> GenerateBigTable() { List<Person> ps = new List<Person>(); for (int i = 0; i < 5000000; i++) ps.Add(new Person() { Name = "c64" + i.ToString(), Age = i.ToString(), Address = "Taipei " + i.ToString(), Title = "Title" + i.ToString(), Company = "GIS " + i.ToString() }); return ps; } } } |
執行結果如下:
Generating Data. Starting Searching. elapsed 1005 ms |
哇嗚,效能提升一倍不是?這證明了平行運算技術於多核心上的確有助於效能提升,再進一步,使用4核心電腦配上4執行緒。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { private static Person Searching_4core(List<Person> datas) { Person resultItem = null; int spin = datas.Count / 4; Task t1 = new Task(() => { for (int i = 0; i < spin; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); Task t2 = new Task(() => { for (int i = spin; i < spin * 2; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); Task t3 = new Task(() => { for (int i = spin * 2; i < spin * 3; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); Task t4 = new Task(() => { for (int i = spin * 3; i < datas.Count; i++) { if (resultItem != null) break; var item = datas[i]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; break; } } }); t1.Start(); t2.Start(); t3.Start(); t4.Start(); t1.Wait(); t2.Wait(); t3.Wait(); t4.Wait(); return resultItem; } static void Main(string[] args) { Console.WriteLine("Generating Data."); var datas = Person.GenerateBigTable(); Console.WriteLine("Starting Searching."); Stopwatch sw = new Stopwatch(); sw.Start(); Person result = Searching_4core(datas); sw.Stop(); Console.WriteLine("elapsed {0} ms", sw.ElapsedMilliseconds); Console.ReadLine(); } } public class Person { public string Name { get; set; } public string Age { get; set; } public string Address { get; set; } public string Company { get; set; } public string Title { get; set; } public static List<Person> GenerateBigTable() { List<Person> ps = new List<Person>(); for (int i = 0; i < 5000000; i++) ps.Add(new Person() { Name = "c64" + i.ToString(), Age = i.ToString(), Address = "Taipei " + i.ToString(), Title = "Title" + i.ToString(), Company = "GIS " + i.ToString() }); return ps; } } } |
執行結果如下:
Generating Data. Starting Searching. elapsed 716 ms |
雖然沒有單核到雙核的戲劇化提升,但也提升了40%左右,這是受到執行緒的建立成本也相對提升的緣故,當資料量越高、運算量提高,其提升比率也會逐漸成長至70%左右。
Parallel.For
平行運算需求因多核心平價而日漸增加,.NET Framework 4 基於Task Library的基礎發展出了 Task Parallel Library,簡稱為TPL的技術,內涵便是將前面所提到的依據核心數建立對應的執行緒技巧,封裝成方便使用的函式,上例可以使用TPL重新撰寫,見下例:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication13 { class Program { private static Person SearchingWithTPL(List<Person> datas) { Person resultItem = null; Parallel.For(0, datas.Count, (index,loolState) => { if (resultItem != null) loolState.Stop(); var item = datas[index]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; loolState.Stop(); } }); return resultItem; } static void Main(string[] args) { Console.WriteLine("Generating Data."); var datas = Person.GenerateBigTable(); Console.WriteLine("Starting Searching."); Stopwatch sw = new Stopwatch(); sw.Start(); Person result = SearchingWithTPL(datas); sw.Stop(); Console.WriteLine("elapsed {0} ms", sw.ElapsedMilliseconds); Console.ReadLine(); } } public class Person { public string Name { get; set; } public string Age { get; set; } public string Address { get; set; } public string Company { get; set; } public string Title { get; set; } public static List<Person> GenerateBigTable() { List<Person> ps = new List<Person>(); for (int i = 0; i < 5000000; i++) ps.Add(new Person() { Name = "c64" + i.ToString(), Age = i.ToString(), Address = "Taipei " + i.ToString(), Title = "Title" + i.ToString(), Company = "GIS " + i.ToString() }); return ps; } } } |
在4核電腦上,執行結果與前面使用4核搭配Task的方式相近。
Generating Data. Starting Searching. elapsed 904 ms |
當然,你會發現到其實與單4核+Task的方式有所差距,這是因為Parallel.For必須耗費一些時間初始化,加上其一開始只用了單一Task跑,依序才增加到4個Task之故(其實依據資料量的不同,當無法被核心數整除時,會出現第五個Task)。那何時會到全速呢?也就是在4核電腦上,何時會由1變成4?這很難估算,TPL內部會先啟動一個Thread,接著該Thread開始前,自己會再啟動另一個Thread,過程會是 1、2、4,三個步驟後會達到4核,此稱為為Self Replication Task。
在8核,過程會是1、2、4、8 |
Parallel.For函式的所有重載原型如下:
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body); public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body); public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long> body); public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body); public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body); public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body); public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body); public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body); |
分成幾個部份來討論,fromInclusive代表著起始元素索引,toExclusive代表著結束元素索引,當要搜尋一個1~100的陣列時,fromInclusive為0、toExclusive為100,這與for的寫法完全相同,For函式擁有int、long兩個重載函式,可處理int、long型別的fromInclusive、toExclusive。
Action<int>、Action<long>代表著循環執行的delegate,依據使用int、long的不同,該Action必須接受一個int或是long參數,這個參數值便是陣列的索引值,以搜尋動作為例,這個參數值代表著此delegate要處理的陣列元素索引。
Action<int,ParallelLoopState>、Action<long,ParallelLoopState>也代表著循環執行的delegate,但其多接收一個參數:ParallelLoopState,這個參數擁有兩個重要的函式:Stop、Break,當我們於delegate中呼叫ParallelLoopState.Stop時,即是告訴Parallel.For,循環動作到此為止,所有循環的Task都必須結束,注意!這不代表所有Task會在你呼叫Stop時就立即終止,它們還是會完成最後一個循環,只是不會再開始下一循環而已,舉個例來說,當於4核心電腦上,有四個Task循環一陣列,A到了50,B到了260,C到了400,當D到了600呼叫Stop時,A會做完50才結束,B也會做完260才結束。
Break函式與Stop函式有些不同,其除了要求所有Task循環結束外,還會記錄呼叫Break的循環索引值於LowestBreakIteration屬性中,這有什麼用呢 ?這大多用於已排序之循序搜尋的情況下,例如先找到Age < 20的所有資料,此時若單使用Stop,你無法得知最後一筆Age >= 20的元素之索引,除非你自行使用變數來記錄,此時利用Break,其便會將呼叫Break的元素索引放在LowestBreakIteration屬性中,你只要取用即可。
不過ParallelLoopState不是只能在Action<int,ParallelLoopState>這個delegate用嗎?是的,不過其值最終會被封成ParallelLoopResult物件由For函式回傳。
private static Person SearchingWithTPL2(List<Person> datas) { Person resultItem = null; ParallelLoopResult result = Parallel.For(0, datas.Count, (index, loolState) => { if (resultItem != null) loolState.Break(); var item = datas[index]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; loolState.Break(); } }); Console.WriteLine(result.LowestBreakIteration); return resultItem; } |
ParallelLoopResult的原型如下:
public struct ParallelLoopResult { public bool IsCompleted {get;} public long? LowestBreakIteration {get;} } |
LowestBreakIteration中包含了呼叫Break時的索引值,IsCompleted則代表著For是否完成所有循環後結束,如果For期間有呼叫了Stop、或是Break,那麼IsCompleted會是False。
最後一個未提及的型別是ParallelOptions,其原型如下:
public class ParallelOptions { public CancellationToken CancellationToken { get; set;} public int MaxDegreeOfParallelism { get;set;} public TaskScheduler TaskScheduler {get;set;} } |
CancellationToken與Task Library的CancellationToke機制相同,讓設計師可以於For期間進行取消動作,見下例。
private static void UsingCancelToken() { int[] datas = { 1, 2, 3, 0, 5, 6, 7, 8, 9, 15, 3, 45, 56, 77 }; int incVal = 0; ParallelLoopResult callResult; try { using (CancellationTokenSource cts = new CancellationTokenSource()) { ParallelOptions po = new ParallelOptions() { CancellationToken = cts.Token }; callResult = Parallel.For(0, datas.Length, po, (index) => { try { double f = 15 / datas[index]; //throw a exception Interlocked.Increment(ref incVal); } catch (Exception) { cts.Cancel(); } }); } } catch (OperationCanceledException ex) { Console.WriteLine("operation is cancel, processed items:" + incVal.ToString()); } } |
MaxDegreeOfParallelism是一個int型別的屬性,可以讓設計師決定要用幾個執行緒來執行For循環,預設為-1,也就是依據CPU/核心數決定。
TaskScheduler可讓設計師指定Task Scheduler,預設是使用內建的Task Scheduler。
下面的寫法即是透過ParallelOptions要求For僅使用兩個執行緒。
private static Person SearchingWithTPL3(List<Person> datas) { Person resultItem = null; ParallelLoopResult result = Parallel.For(0, datas.Count, new ParallelOptions() { MaxDegreeOfParallelism = 2}, (index, loolState) => { if (resultItem != null) loolState.Stop(); var item = datas[index]; if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; loolState.Stop(); } }); return resultItem; } |
Parallel.ForEach
For只能應用在具索引之陣列化資料型態,倘若資料非具索引之陣列時,可使用另一個函式:ForEach,其用法與For類似,不過你不需傳入起始與結束索引值:
private static Person SearchingWithForEach(IEnumerable<Person> datas) { Person resultItem = null; Parallel.ForEach(datas, (item, loolState) => { if (resultItem != null) loolState.Stop(); if ((item.Name.ToLower() == "c644999999") && (item.Address.ToLower() == "taipei 4999999")) { resultItem = item; loolState.Stop(); } }); return resultItem; } |
倘若資料無法形成IEnumerable,例如Tree Node,那麼可利用C# 2.0的yield技巧:
............ public class Tree { public Tree Left { get; set; } public Tree Right { get; set; } public int Value { get; set; } public static Tree Build(int start, int level) { Tree tree = new Tree { Value = start }; if (level <= 0) return tree; var left = Build(start + 1, level - 1); var right = Build(start + 1, level - 1); tree.Left = left; tree.Right = right; return tree; } public static IEnumerable<int> GetNodes(Tree tree) { if (tree != null) { yield return tree.Value; foreach (var data in GetNodes(tree.Left)) yield return data; foreach (var data in GetNodes(tree.Right)) yield return data; } } } ................ private static void ParallelTreeSum() { Tree result = Tree.Build(0, 20); int total = 0; Parallel.ForEach(Tree.GetNodes(result), (value) => { Interlocked.Add(ref total, value); for (int i = 0; i < 100; i++) Math.Log10(i); }); } |
請注意,ForEach僅適用於未具索引及未能陣列化的資料,如果資料不是這類的,可以使用For時,別偷懶使用ForEach,因為ForEach需要於取出元素時進行鎖定、且無法精確的分割資料,其效能一定比For來得差。
For、ForEach的例外處理
當For、ForEach期間發生例外時,與Task的Parent/Child模式相同,其會拋出AggreateException例外,內部的InnerExceptions中包含所有已產生的例外。
private static void ProcessException() { int[] datas = { 1, 2, 3, 0, 5, 6, 7, 8, 9 }; try { Parallel.For(0, datas.Length, (index) => { double f = 15 / datas[index]; //throw a exception }); } catch (AggregateException ex) { foreach (var exItem in ex.InnerExceptions) Console.WriteLine(exItem.Message); } } |
LoopInit與LoopFinally
For、ForEach的工作模式,均是將資料分割成若干區塊,分別交由個別的執行緒執行,以此來獲取多CPU/核心的優勢,這種模式特別適用於資料搜尋及運算,但目的若是資料過濾時,要獲取多CPU/核心的優勢,得先面對lock機制的考驗,見下例:
private static List<Person> FilterWithTPL(List<Person> datas) { List<Person> results = new List<Person>(); Parallel.For(0,datas.Count, (index) => { var item = datas[index]; if (item.Name.ToLower().Contains("9")) { lock (results) { results.Add(item); } } }); return results; } |
此例的執行時間如下:
Generating Data. Starting Searching. elapsed 2313 ms |
這個例子發揮了TPL的最大效能了嗎?沒有,因為這個例子有一大部份時間都花在lock/unlock上了,透過For、ForEach所提供,可接受LoopInit及LoopFinally delegate的重載函式,我們可以大幅減少lock/unlock的次數:
private static List<Person> FilterWithTPL2(List<Person> datas) { List<Person> results = new List<Person>(); Parallel.For<List<Person>>(0, datas.Count, () => { return new List<Person>(); }, (index, state, bags) => { var item = datas[index]; if (item.Name.ToLower().Contains("9")) bags.Add(item); return bags; }, (bags) => { lock (results) { foreach (var item in bags) results.Add(item); } } ); return results; } |
此例執行時間如下:
Generating Data. Starting Searching. elapsed 1757 ms |
效率提升了,但為何呢?For、ForEach允許傳入localInit的delegate,其會在區段開始時被執行,以4核心電腦來說,此例會被分割成四個區段,每個區段開始時會執行一次localInit的delegate,於此我們建立了一個List<Person>物件,接著將符合過濾條件的元素填入此List<Person>物件中,因為bags是由localInit delegate所建立的,所以完全不需要lock動作,當區段結束後localFinally的delegate會被執行,此時將bags中的資料倒入results中,此時由於存取共用的results物件,所以需要lock,這樣一來,原本需要lock很多次的動作,被減少到只lock四次,效能自然就會提升了。
Parallel Invoking
除了For、ForEach外,TPL還提供了Invoke函式,允許設計師以平行的方式呼叫多個delegate,見下例:
private static void TestInvoke() { int total1 = 0,total2 = 0; Parallel.Invoke(() => { for (int i = 0; i < 10; i++) total1 += i; }, () => { for (int i = 0; i < 10; i++) total2 += i * 2; }); Console.WriteLine("Value 1: {0}, Value2 : {1}", total1, total2); } |
以本例來說, +i及 + i * 2這兩個delegate會被放到個別的Task同時執行,結果如下:
Generating Data. Starting Searching. Value 1: 45, Value2 : 90 elapsed 83 ms |
當核心數少於需執行的delegate數時,Invoke會依序排程執行,例如8 delegate於4核心電腦時,前4個會先被執行、後4個會等待前4個某個結束時再排入執行。
Parallel LINQ
.NET Framework 4也對LINQ進行了延展,使其能直接利用Task Parallel Library來獲取多CPU/核心的優勢,名為:PLINQ,用法異常的簡單,.NET Framework 4為IEnumerable<T>加入一個AsParallel的extend method,當設計師為IEnumerable<T>呼叫此函式時,其會回傳一個適用於Task Parallel Library的IEnumerable<T>物件,此時這個IEnumerable<T>物件是ParallelQuery<TSource>,所有原本的Where、Select等延展函式皆已被重寫成可完全利用TPL。
就使用而言,我們其實不需要知道這麼多,只要知道當需要用到PLINQ時,先對資料來源呼叫AsParallel函式即可。
static void Main(string[] args) { Console.WriteLine("Generating Data."); var datas = Person.GenerateBigTable(); Console.WriteLine("Starting Searching."); Stopwatch sw = new Stopwatch(); sw.Start(); try { var resultItem = (from s1 in datas.AsParallel() where s1.Name == "c644999999" select s1).FirstOrDefault(); } catch (Exception ex) { } sw.Stop(); Console.WriteLine("elapsed {0} ms", sw.ElapsedMilliseconds); Console.ReadLine(); } |
執行結果如下:
Generating Data. Starting Searching. elapsed 568 ms |
當拿掉AsParallel後,結果如下:
Generating Data. Starting Searching. elapsed 742 ms |
PLINQ的出現,的確讓原本就熟悉LINQ技術的設計師如虎添翼,就搜尋、過濾資料而言,PLINQ比起For、ForEach簡單、好用太多了。
數字的迷思
當你針對TPL來測定效能的提升度時,有時會落入一個數字的迷思,最常見的是當效能總數降到1000 ms以下時,不管你如何做,效能的提升都很有限,這是因為For、ForEach本身也是需要建立成本的,當建立成本高於單緒執行成本時,你就應該考慮該工作是否適合使用TPL。
偶而,你會觀測到懸殊的差距,這多半是因為TPL分割資料時發生無法整除於核心數時,其會多加一個Task,也就是說4核的電腦上,是有可能出現For、ForEach產生5個Task執行的情況,此時如果要搜尋的資料元素是在陣列的尾端時,因為第5個Task的出現,所以會很快就找到。