Use C# to achieve asynchronous exchange in mutithread. Part 3
我們將在本篇實作一個範例
應用之前介紹的框架
這裡使用筆者之前寫的一個股票分析系統
會自動的由網頁上下載股票的資訊
然後進行分析
首先敘述一下這個程式的概念
1. 程式將會連上股票資訊的網頁,並且將整個網頁內容下載下來
2. 將網頁的內容進行分析,取出需要的資訊
3. 將資料儲存為XML檔案
由以上的敘述
我們可以推測我們至少需要三條執行緒來執行工作
分別是下載,分析與儲存
另外我還加了一條專門處理記錄例外訊息的執行緒
所以一共是四條
首先我們建立一個基本的Thread類別
方便以後統一作管理
namespace StocksAnalytics { public class AEBaseThread : AEThread { public AEBaseThread(string szID, int iWorkCycle, int iSleepCycle) : base(szID, iWorkCycle, iSleepCycle) { } public virtual int UncompleteQueue { get { return base.TotalUncompleteElement; } } protected void SendLog(object log) { AEQueueController.Instance.SendData( Globe.asID[(int)Globe.eID.LOG], this.iAEThreadID, new Everyone_2_Log(log.ToString())); } } }
然後再分別建立各個工作的執行緒
首先是下載網頁的執行緒
namespace StocksAnalytics { public class DW_HTML : AEBaseThread { private const string REPLACE_WORD = "REPLACE_WORD"; #region URL /// <summary> /// 今日行情 /// </summary> private const string URL_Today = @"http://tw.stock.yahoo.com/q/q?s=" + REPLACE_WORD; /// <summary> /// 公司基本資料 /// </summary> private const string URL_Base = @"http://tw.stock.yahoo.com/d/s/company_" + REPLACE_WORD + ".html"; /// <summary> /// 融資融券 /// </summary> private const string URL_Chip = @"http://tw.stock.yahoo.com/d/s/credit_" + REPLACE_WORD + ".html"; #endregion private Queue<string> _qCompanyCode; public DW_HTML(string szID) : base(szID, 100, 10 * 1000) { _qCompanyCode = new Queue<string>(4000); } public void Enqueue(string szCode) { lock (_qCompanyCode) { if (_qCompanyCode.Contains(szCode)) return; _qCompanyCode.Enqueue(szCode); } } protected override bool Run() { if (_qCompanyCode.Count == 0) return false; string szCode_ = _qCompanyCode.Dequeue(); try { string szURL_Today_ = URL_Today.Replace(REPLACE_WORD, szCode_); string szContent_Today_ = DownloadWeb(szURL_Today_); string szURL_Base_ = URL_Base.Replace(REPLACE_WORD, szCode_); string szContent_Base_ = DownloadWeb(szURL_Base_); string szURL_Chip_ = URL_Chip.Replace(REPLACE_WORD, szCode_); string szContent_Chip_ = DownloadWeb(szURL_Chip_); // 內容都下載完成之後就往 Parser 的執行緒送 AEQueueController.Instance.SendData( Globe.asID[(int)Globe.eID.PARSER_HTML_1], this.iAEThreadID, new DW_2_Parser(szContent_Today_, szContent_Base_, szContent_Chip_)); } catch (Exception err) { // 如果發生錯誤 就記下log base.SendLog(err); } //Program.DebugWrite(szContent_Today_ + "\n\n" + szContent_Base_ + "\n\n" + szContent_Chip_, 2); return true; } private string DownloadWeb(string szURL) { string szResult_ = ""; HttpWebRequest reqHttp_ = (HttpWebRequest)WebRequest.Create(szURL); reqHttp_.Timeout = 60000; HttpWebResponse respHttp_ = (HttpWebResponse)reqHttp_.GetResponse(); StreamReader readerHtml = new StreamReader(respHttp_.GetResponseStream(), Encoding.GetEncoding("big5")); szResult_ = readerHtml.ReadToEnd(); readerHtml.Close(); return szResult_; } public override int UncompleteQueue { get { return _qCompanyCode.Count; }// get }// method }// class }// namespace
然後是分析網頁的執行緒
至於分析網頁的方式
這邊就不詳述
請參考前面的文章 利用mshtml進行網頁分析
http://www.dotblogs.com.tw/ireullin/archive/2010/05/30/15512.aspx
namespace StocksAnalytics { class Parser_HTML : AEBaseThread { /// <summary> /// 記錄股票資訊的結構 /// </summary> private StockData _mStockData; public Parser_HTML(string szID) : base(szID, 100, 1000) { } /// <summary> /// DW_HTML送出來的資料會在這邊接收 /// 然後進行分析 /// 分析完之後在往StorageXML的執行緒丟 /// </summary> /// <param name="Params"></param> public override void Complete(object[] Params) { _mStockData = new StockData(); try { _mStockData.tagStockToday = ParserToday(Params[0].ToString()); _mStockData.tagStockBase = ParserBase(Params[1].ToString()); _mStockData.tagStockChip = ParserChip(Params[2].ToString()); AEQueueController.Instance.SendData( Globe.asID[(int)Globe.eID.STORAGE_XML_1], this.iAEThreadID, new Parser_2_Storage(_mStockData)); } catch (Exception err) { // 如果發生錯誤 就記下log base.SendLog(err); } } /// <summary> /// 分析籌碼面的網頁 /// </summary> /// <param name="szHtmlContent"></param> /// <returns></returns> private StockChip ParserChip(string szHtmlContent) { // 省略 } /// <summary> /// 分析今日價格的網頁 /// </summary> /// <param name="szHtmlContent"></param> /// <returns></returns> private StockToday ParserToday(string szHtmlContent) { // 省略 } /// <summary> /// 分析基本面的網頁 /// </summary> /// <param name="szHtmlContent"></param> /// <returns></returns> private StockToday ParserBase(string szHtmlContent) { // 省略 } } }
然後在這邊接收
並且將資料結構序列化
儲存到xml檔案中
而儲存log的執行緒與此執行緒相類似
這邊就不再贅訴
namespace StocksAnalytics { public class StorageXML : AEBaseThread { public StorageXML(string szID) : base(szID, 1 * 1000, 5 * 1000) { } protected override bool Run() { return base.Run(); } public override void Complete(object[] Params) { FileStream mFileStream_ = new FileStream(".\\test.xml", FileMode.Create); XmlSerializer mXmlSerializer_ = new XmlSerializer(typeof(StockData)); mXmlSerializer_.Serialize(mFileStream_, Params[0]); mFileStream_.Close(); } } }
接下來這邊就是各個執行緒之間
用來傳遞資料的元素了
首先是下載網頁的執行緒送到分析的執行緒的
namespace StocksAnalytics { class DW_2_Parser : AEElement { private string[] _asContent; /// <summary> /// Sequence: /// Today,Base,Chip /// </summary> /// <param name="asContent"></param> public DW_2_Parser(params string[] asContent) { _asContent = asContent; } /// <summary> /// Sequence: /// Today,Base,Chip /// </summary> /// <param name="Params"></param> void AEElement.Execute(ref object[] Params) { Params = _asContent; } } }
然後是分析送到儲存的
namespace StocksAnalytics { public class Parser_2_Storage : AEElement { StockData _mStockData; public Parser_2_Storage(StockData mStockData) { _mStockData = mStockData; } void AEElement.Execute(ref object[] Params) { Params = new object[] { _mStockData }; } } }
最後還有一個用來記錄Log的
namespace StocksAnalytics { public class Everyone_2_Log : AEElement { public string _szErr; public Everyone_2_Log(string szErr) { _szErr = szErr; } void AEElement.Execute(ref object[] Params) { Params = new object[] { _szErr }; } } }
由以上的程式碼
我們可以注意到
元素的物件都相當簡單
在建構式的時候將傳入的參數存儲起來
然後在Execute的函式中
把變數傳到目的地執行緒的Complete中
這樣一些基本的類別就已經都準備好了
接下來就是把這些東西組合起來
首先我準備了一個類別
將裡頭的欄位都設為public static以方便取用
這裡跟使用Singleton pattern的意思是一樣的
namespace StocksAnalytics { public abstract class Globe { /// <summary> /// 這裡是各執行緒的ID對應列舉 /// </summary> public enum eID : int { DW_HTML_1=0, PARSER_HTML_1, STORAGE_XML_1, LOG } /// <summary> /// 將各執行緒的指標存為陣列方便管理 /// </summary> public static AEBaseThread[] m_arrThred; /// <summary> /// 各執行緒的ID名稱 /// </summary> public static string[] asID { get { if (_asID == null) _asID = Enum.GetNames(typeof(eID)); return _asID; } } private static string[] _asID; } }
以下的程式碼
便是在一個表單視窗中
把上面的物件組合起來
namespace StocksAnalytics { public partial class Form1 : Form { private ListViewItem[] _m_arrListViewItemCurr; public Form1() { InitializeComponent(); } /// <summary> /// 程式開起來以後的執行順序 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Form1_Load(object sender, EventArgs e) { this.ThredInit(); this.ThreadRegister(); this.ListViewInit(); this.ListViewRefresh(); this.TreeInit(); this.timer1.Start(); this.StartAnalytics(); } /// <summary> /// 初始化畫面左邊的樹狀圖 /// </summary> private void TreeInit() { string[] asQueue_ = AEQueueController.Instance.GetRegisteredQueueID(); TreeNode nodeQueue_ = new TreeNode("RegisteredQueue"); foreach (string szTmp in asQueue_) nodeQueue_.Nodes.Add(szTmp.Replace("|", " to ")); TreeNode nodeThread_ = new TreeNode("WorkingThread"); foreach(AEBaseThread m_AEBaseThread_ in Globe.m_arrThred) nodeThread_.Nodes.Add(m_AEBaseThread_.szIdentifyName); TreeNode nodeSetting_ = new TreeNode("Setting"); treeView1.Nodes.Add("root"); treeView1.Nodes[0].Nodes.Add(nodeThread_); treeView1.Nodes[0].Nodes.Add(nodeQueue_); treeView1.Nodes[0].Nodes.Add(nodeSetting_); treeView1.Nodes[0].Expand(); } /// <summary> /// 所有東西都建構完成 /// 開始執行程式 /// </summary> private void StartAnalytics() { // 由這裡傳入要下載的股票資訊的編號 for (int i = 1200; i < 1220; i++) { ((DW_HTML)Globe.m_arrThred[(int)Globe.eID.DW_HTML_1]).Enqueue(i.ToString()); } } /// <summary> /// 向AEQueueController註冊執行緒 /// 以建立Queue /// </summary> private void ThreadRegister() { // 註冊從下載網頁的執行緒到分析的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.DW_HTML_1], Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1]); // 註冊從下載網頁的執行緒到Log的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.DW_HTML_1], Globe.m_arrThred[(int)Globe.eID.LOG]); // 註冊從分析的執行緒到儲存的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1], Globe.m_arrThred[(int)Globe.eID.STORAGE_XML_1]); // 註冊從分析的執行緒到Log的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1], Globe.m_arrThred[(int)Globe.eID.LOG]); } /// <summary> /// 建立各個Thread實體 /// 並且存成陣列 /// </summary> private void ThredInit() { ArrayList alThread_ = new ArrayList(Globe.asID.Length); alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_1])); alThread_.Add(new Parser_HTML(Globe.asID[(int)Globe.eID.PARSER_HTML_1])); alThread_.Add(new StorageXML(Globe.asID[(int)Globe.eID.STORAGE_XML_1])); alThread_.Add(new WriteLog(Globe.asID[(int)Globe.eID.LOG])); Globe.m_arrThred = (AEBaseThread[])alThread_.ToArray(typeof(AEBaseThread)); foreach(AEBaseThread mAEBaseThread_ in Globe.m_arrThred) { mAEBaseThread_.Start(); } } /// <summary> /// 定時更新各執行緒的工作狀態 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void timer1_Tick(object sender, EventArgs e) { this.ListViewRefresh(); } /// <summary> /// 取得各執行緒的工作狀態 /// </summary> private void ListViewRefresh() { int i = 0; foreach(AEBaseThread m_AEBaseThread_ in Globe.m_arrThred) { int j = 0; _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.szIdentifyName; _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.iAEThreadID.ToString(); _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.isWork.ToString(); _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.UncompleteQueue.ToString(); i++; } listView1.Items.Clear(); listView1.Items.AddRange(_m_arrListViewItemCurr); } /// <summary> /// 初始化執行緒工作狀態的顯示視窗 /// </summary> private void ListViewInit() { string[] asColumn_ = {"Name","ID","Workin","Uncomplete" }; foreach (string szColName_ in asColumn_) listView1.Columns.Add(szColName_, szColName_.Length*20); _m_arrListViewItemCurr = new ListViewItem[Globe.asID.Length]; for (int i = 0; i < Globe.asID.Length; i++) { _m_arrListViewItemCurr[i] = new ListViewItem(asColumn_); } } /// <summary> /// 當程式結束時 /// 停止各個執行緒 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Form1_FormClosing(object sender, FormClosingEventArgs e) { this.timer1.Stop(); AEBaseThread.StopAll(); } } }
左邊的樹狀圖顯示出目前程式中的執行緒
以及Queue的數量
右邊的View可以看到執行緒的工作狀況
我們可以注意到
幾乎所有的工作量都集中在下載網頁的執行緒上
所以我們決定修改一下程式
多開幾條執行緒
來分攤下載網頁的工作
首先在Globe中
多加入兩個ID
/// <summary> /// 這裡是各執行緒的ID對應列舉 /// </summary> public enum eID : int { DW_HTML_1=0, DW_HTML_2, DW_HTML_3, PARSER_HTML_1, STORAGE_XML_1, LOG }
將新增的兩條執行緒實做出來
/// <summary> /// 建立各個Thread實體 /// 並且存成陣列 /// </summary> private void ThredInit() { ArrayList alThread_ = new ArrayList(Globe.asID.Length); alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_1])); alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_2])); alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_3])); alThread_.Add(new Parser_HTML(Globe.asID[(int)Globe.eID.PARSER_HTML_1])); alThread_.Add(new StorageXML(Globe.asID[(int)Globe.eID.STORAGE_XML_1])); alThread_.Add(new WriteLog(Globe.asID[(int)Globe.eID.LOG])); Globe.m_arrThred = (AEBaseThread[])alThread_.ToArray(typeof(AEBaseThread)); foreach(AEBaseThread mAEBaseThread_ in Globe.m_arrThred) { mAEBaseThread_.Start(); } }
還有新增Queue的註冊
我這裡改用一個比較懶的方式
用迴圈來完成他
/// <summary> /// 向AEQueueController註冊執行緒 /// 以建立Queue /// </summary> private void ThreadRegister() { //DownloadHTML to other string szKeyWord_ = "DW_HTML_"; for (int i = 0; i < Globe.asID.Length; i++) { if (Globe.asID[i].Length < szKeyWord_.Length) continue; if (Globe.asID[i].Substring(0, szKeyWord_.Length) == szKeyWord_) { // To Parser AEQueueController.Instance.Register( Globe.m_arrThred[i], Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1]); // To Log AEQueueController.Instance.Register( Globe.m_arrThred[i], Globe.m_arrThred[(int)Globe.eID.LOG]); } } // 註冊從分析的執行緒到儲存的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1], Globe.m_arrThred[(int)Globe.eID.STORAGE_XML_1]); // 註冊從分析的執行緒到Log的執行緒的Queue AEQueueController.Instance.Register( Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1], Globe.m_arrThred[(int)Globe.eID.LOG]); }
然後修改一下下載網頁的分配方式
/// <summary> /// 所有東西都建構完成 /// 開始執行程式 /// </summary> private void StartAnalytics() { for (int i = 1200; i < 1220; i++) { ((DW_HTML)Globe.m_arrThred[i % 3]).Enqueue(i.ToString()); } }
這樣就好了
總修改應該不超過十行
請各位自行跟上面的程式碼比對一下
程式執行後
我們可以看到現在有三條執行緒同時在下載網頁
左邊也能看到相對應的Queue
不過愛爾蘭這邊的網路實在太慢XD
主要的工作還是卡在下載上