用c#實作多執行緒之間的非同步式交換架構 Part 3

Use C# to achieve asynchronous exchange in mutithread. Part 3

我們將在本篇實作一個範例

應用之前介紹的框架

 

這裡使用筆者之前寫的一個股票分析系統

會自動的由網頁上下載股票的資訊

然後進行分析

 

首先敘述一下這個程式的概念

1. 程式將會連上股票資訊的網頁,並且將整個網頁內容下載下來

2. 將網頁的內容進行分析,取出需要的資訊

3. 將資料儲存為XML檔案

由以上的敘述

我們可以推測我們至少需要三條執行緒來執行工作

分別是下載,分析與儲存

另外我還加了一條專門處理記錄例外訊息的執行緒

所以一共是四條

 

首先我們建立一個基本的Thread類別

方便以後統一作管理

	namespace StocksAnalytics
{
    public class AEBaseThread : AEThread
    {
        public AEBaseThread(string szID, int iWorkCycle, int iSleepCycle)
            : base(szID, iWorkCycle, iSleepCycle)
        { 
        
        }


        public virtual int UncompleteQueue
        {
            get
            {
                return base.TotalUncompleteElement;
            }        
        }


        protected void SendLog(object log)
        {
            AEQueueController.Instance.SendData(
                        Globe.asID[(int)Globe.eID.LOG],
                        this.iAEThreadID,
                        new Everyone_2_Log(log.ToString()));
        }
    }
}

 

 

然後再分別建立各個工作的執行緒

首先是下載網頁的執行緒

	namespace StocksAnalytics
{
    public class DW_HTML : AEBaseThread
    {
        private const string REPLACE_WORD = "REPLACE_WORD";

        #region URL
        /// <summary>
        /// 今日行情
        /// </summary>
        private const string URL_Today = @"http://tw.stock.yahoo.com/q/q?s=" + REPLACE_WORD;

        /// <summary>
        /// 公司基本資料
        /// </summary>
        private const string URL_Base = @"http://tw.stock.yahoo.com/d/s/company_" + REPLACE_WORD + ".html";

        /// <summary>
        /// 融資融券
        /// </summary>
        private const string URL_Chip = @"http://tw.stock.yahoo.com/d/s/credit_" + REPLACE_WORD + ".html";
        #endregion

        private Queue<string> _qCompanyCode;

        public DW_HTML(string szID)
            : base(szID, 100, 10 * 1000)
        {
            _qCompanyCode = new Queue<string>(4000);
        
        }


        public void Enqueue(string szCode)
        {
            lock (_qCompanyCode)
            {
                if (_qCompanyCode.Contains(szCode))
                    return;

                _qCompanyCode.Enqueue(szCode);
            }
        }


        protected override bool Run()
        {
            if (_qCompanyCode.Count == 0)
                return false;

            string szCode_ = _qCompanyCode.Dequeue();

            try
            {
                string szURL_Today_ = URL_Today.Replace(REPLACE_WORD, szCode_);
                string szContent_Today_ = DownloadWeb(szURL_Today_);

                string szURL_Base_ = URL_Base.Replace(REPLACE_WORD, szCode_);
                string szContent_Base_ = DownloadWeb(szURL_Base_);

                string szURL_Chip_ = URL_Chip.Replace(REPLACE_WORD, szCode_);
                string szContent_Chip_ = DownloadWeb(szURL_Chip_);


                // 內容都下載完成之後就往 Parser 的執行緒送
                AEQueueController.Instance.SendData(
                    Globe.asID[(int)Globe.eID.PARSER_HTML_1],
                    this.iAEThreadID,
                    new DW_2_Parser(szContent_Today_, szContent_Base_, szContent_Chip_));
            }
            catch (Exception err)
            { 
                // 如果發生錯誤 就記下log
                base.SendLog(err);
            }

            //Program.DebugWrite(szContent_Today_ + "\n\n" + szContent_Base_ + "\n\n" + szContent_Chip_, 2);
            
            return true;
        }


        private string DownloadWeb(string szURL)
        {
            string szResult_ = "";

            HttpWebRequest reqHttp_ = (HttpWebRequest)WebRequest.Create(szURL);
            reqHttp_.Timeout = 60000;

            HttpWebResponse respHttp_ = (HttpWebResponse)reqHttp_.GetResponse();

            StreamReader readerHtml = new StreamReader(respHttp_.GetResponseStream(), Encoding.GetEncoding("big5"));

            szResult_ = readerHtml.ReadToEnd();

            readerHtml.Close();            

            return szResult_;
        }

        public override int UncompleteQueue
        {
            get
            {
                return _qCompanyCode.Count;
            }// get
        }// method
    }// class
}// namespace

 
 

然後是分析網頁的執行緒

至於分析網頁的方式

這邊就不詳述

請參考前面的文章 利用mshtml進行網頁分析

http://www.dotblogs.com.tw/ireullin/archive/2010/05/30/15512.aspx

	namespace StocksAnalytics
{
    class Parser_HTML : AEBaseThread
    {

        /// <summary>
        /// 記錄股票資訊的結構
        /// </summary>
        private StockData _mStockData;

        
        public Parser_HTML(string szID)
            : base(szID, 100, 1000)
        {
        }


        /// <summary>
        /// DW_HTML送出來的資料會在這邊接收
        /// 然後進行分析
        /// 分析完之後在往StorageXML的執行緒丟
        /// </summary>
        /// <param name="Params"></param>
        public override void Complete(object[] Params)
        {
            _mStockData = new StockData();

            try
            {
                _mStockData.tagStockToday = ParserToday(Params[0].ToString());

                _mStockData.tagStockBase = ParserBase(Params[1].ToString());
                
                _mStockData.tagStockChip = ParserChip(Params[2].ToString());                
                

                AEQueueController.Instance.SendData(
                    Globe.asID[(int)Globe.eID.STORAGE_XML_1],
                    this.iAEThreadID,
                    new Parser_2_Storage(_mStockData));            

            }
            catch (Exception err)
            {
                // 如果發生錯誤 就記下log
                base.SendLog(err);
            }

        }


        /// <summary>
        /// 分析籌碼面的網頁
        /// </summary>
        /// <param name="szHtmlContent"></param>
        /// <returns></returns>
        private StockChip ParserChip(string szHtmlContent)
        {
            // 省略
        }


        /// <summary>
        /// 分析今日價格的網頁
        /// </summary>
        /// <param name="szHtmlContent"></param>
        /// <returns></returns>
        private StockToday ParserToday(string szHtmlContent)
        {
            // 省略
        }

        
        /// <summary>
        /// 分析基本面的網頁
        /// </summary>
        /// <param name="szHtmlContent"></param>
        /// <returns></returns>
        private StockToday ParserBase(string szHtmlContent)
        {
            // 省略
        }
    }
}

 

 

然後在這邊接收

並且將資料結構序列化

儲存到xml檔案中

而儲存log的執行緒與此執行緒相類似

這邊就不再贅訴

	namespace StocksAnalytics
{
    public class StorageXML : AEBaseThread
    {
        public StorageXML(string szID)
            : base(szID, 1 * 1000, 5 * 1000)
        { 
        }


        protected override bool Run()
        {
            return base.Run();
        }


        public override void Complete(object[] Params)
        {
            FileStream mFileStream_ = new FileStream(".\\test.xml", FileMode.Create);

            XmlSerializer mXmlSerializer_ = new XmlSerializer(typeof(StockData));

            mXmlSerializer_.Serialize(mFileStream_, Params[0]);            

            mFileStream_.Close(); 
        }
    }
}

 

 

 

接下來這邊就是各個執行緒之間

用來傳遞資料的元素了

 

首先是下載網頁的執行緒送到分析的執行緒的

	namespace StocksAnalytics
{
    class DW_2_Parser : AEElement
    {
        private string[] _asContent;
        
        /// <summary>
        /// Sequence:
        /// Today,Base,Chip
        /// </summary>
        /// <param name="asContent"></param>
        public DW_2_Parser(params string[] asContent)
        {
            _asContent = asContent;
        }

             
        /// <summary>
        /// Sequence:
        /// Today,Base,Chip
        /// </summary>
        /// <param name="Params"></param>
        void AEElement.Execute(ref object[] Params)
        {
            Params = _asContent;
        }
    }    
}

 

 

然後是分析送到儲存的

	namespace StocksAnalytics
{
    public class Parser_2_Storage : AEElement
    {
        StockData _mStockData;

        public Parser_2_Storage(StockData mStockData)
        {
            _mStockData = mStockData;
        }


        void AEElement.Execute(ref object[] Params)
        {
            Params = new object[] { _mStockData };
        }
    }
}

 

 

最後還有一個用來記錄Log的

	namespace StocksAnalytics
{
    public class Everyone_2_Log : AEElement
    {
        public string _szErr;
        
        public Everyone_2_Log(string szErr)
        {
            _szErr = szErr;
        }
        
        void AEElement.Execute(ref object[] Params)
        {
            Params = new object[] { _szErr };
        }
    }
}

 

由以上的程式碼

我們可以注意到

元素的物件都相當簡單

在建構式的時候將傳入的參數存儲起來

然後在Execute的函式中

把變數傳到目的地執行緒的Complete中

 

 

這樣一些基本的類別就已經都準備好了

接下來就是把這些東西組合起來

首先我準備了一個類別

將裡頭的欄位都設為public static以方便取用

這裡跟使用Singleton pattern的意思是一樣的

	namespace StocksAnalytics
{
    public abstract class Globe
    {
        /// <summary>
        /// 這裡是各執行緒的ID對應列舉
        /// </summary>
        public enum eID : int
        { 
            DW_HTML_1=0,
            PARSER_HTML_1,
            STORAGE_XML_1,
            LOG
        }


        /// <summary>
        /// 將各執行緒的指標存為陣列方便管理
        /// </summary>
        public static AEBaseThread[] m_arrThred;

        
        /// <summary>
        /// 各執行緒的ID名稱
        /// </summary>
        public static string[] asID
        {
            get
            {
                if (_asID == null)
                    _asID = Enum.GetNames(typeof(eID));

                return _asID;
            }
        }

        private static string[] _asID;
    }
}

 

 

以下的程式碼

便是在一個表單視窗中

把上面的物件組合起來

	namespace StocksAnalytics
{
    public partial class Form1 : Form
    {
        
        private ListViewItem[] _m_arrListViewItemCurr;
        
        
        public Form1()
        {
            InitializeComponent();
        }


        /// <summary>
        /// 程式開起來以後的執行順序
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Form1_Load(object sender, EventArgs e)
        {            
            this.ThredInit();

            this.ThreadRegister();

            this.ListViewInit();

            this.ListViewRefresh();

            this.TreeInit();

            this.timer1.Start();

            this.StartAnalytics();
        }


        /// <summary>
        /// 初始化畫面左邊的樹狀圖
        /// </summary>
        private void TreeInit()
        {
            string[] asQueue_ = AEQueueController.Instance.GetRegisteredQueueID();

            TreeNode nodeQueue_ = new TreeNode("RegisteredQueue");
            foreach (string szTmp in asQueue_)
                nodeQueue_.Nodes.Add(szTmp.Replace("|", " to "));



            TreeNode nodeThread_ = new TreeNode("WorkingThread");
            foreach(AEBaseThread m_AEBaseThread_ in Globe.m_arrThred)
                nodeThread_.Nodes.Add(m_AEBaseThread_.szIdentifyName);



            TreeNode nodeSetting_ = new TreeNode("Setting");

            treeView1.Nodes.Add("root");
            treeView1.Nodes[0].Nodes.Add(nodeThread_);
            treeView1.Nodes[0].Nodes.Add(nodeQueue_);
            treeView1.Nodes[0].Nodes.Add(nodeSetting_);
            treeView1.Nodes[0].Expand();
        }


        /// <summary>
        /// 所有東西都建構完成
        /// 開始執行程式
        /// </summary>
        private void StartAnalytics()
        {
            // 由這裡傳入要下載的股票資訊的編號
            for (int i = 1200; i < 1220; i++)
            {
                ((DW_HTML)Globe.m_arrThred[(int)Globe.eID.DW_HTML_1]).Enqueue(i.ToString());
            }        
        }


        /// <summary>
        /// 向AEQueueController註冊執行緒
        /// 以建立Queue
        /// </summary>
        private void ThreadRegister()
        { 
            // 註冊從下載網頁的執行緒到分析的執行緒的Queue
            AEQueueController.Instance.Register(
                Globe.m_arrThred[(int)Globe.eID.DW_HTML_1],
                Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1]);


            // 註冊從下載網頁的執行緒到Log的執行緒的Queue
            AEQueueController.Instance.Register(
                Globe.m_arrThred[(int)Globe.eID.DW_HTML_1],
                Globe.m_arrThred[(int)Globe.eID.LOG]);


            // 註冊從分析的執行緒到儲存的執行緒的Queue
            AEQueueController.Instance.Register(
                Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1],
                Globe.m_arrThred[(int)Globe.eID.STORAGE_XML_1]);


            // 註冊從分析的執行緒到Log的執行緒的Queue
            AEQueueController.Instance.Register(
                Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1],
                Globe.m_arrThred[(int)Globe.eID.LOG]);

        }


        /// <summary>
        /// 建立各個Thread實體 
        /// 並且存成陣列
        /// </summary>
        private void ThredInit()
        {
            ArrayList alThread_ = new ArrayList(Globe.asID.Length);

            
            alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_1]));
            alThread_.Add(new Parser_HTML(Globe.asID[(int)Globe.eID.PARSER_HTML_1]));
            alThread_.Add(new StorageXML(Globe.asID[(int)Globe.eID.STORAGE_XML_1]));
            alThread_.Add(new WriteLog(Globe.asID[(int)Globe.eID.LOG]));


            Globe.m_arrThred = (AEBaseThread[])alThread_.ToArray(typeof(AEBaseThread));


            foreach(AEBaseThread mAEBaseThread_ in Globe.m_arrThred)
            {
                mAEBaseThread_.Start();
            }
        }


        /// <summary>
        /// 定時更新各執行緒的工作狀態
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void timer1_Tick(object sender, EventArgs e)
        {
            this.ListViewRefresh();
        }


        /// <summary>
        /// 取得各執行緒的工作狀態
        /// </summary>
        private void ListViewRefresh()
        {
            int i = 0;
            foreach(AEBaseThread m_AEBaseThread_ in Globe.m_arrThred)
            {
                int j = 0;
                _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.szIdentifyName;
                _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.iAEThreadID.ToString();
                _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.isWork.ToString();
                _m_arrListViewItemCurr[i].SubItems[j++].Text = m_AEBaseThread_.UncompleteQueue.ToString();

                i++;
            }

            listView1.Items.Clear();

            listView1.Items.AddRange(_m_arrListViewItemCurr);        
        }


        /// <summary>
        /// 初始化執行緒工作狀態的顯示視窗
        /// </summary>
        private void ListViewInit()
        {
            string[] asColumn_ = {"Name","ID","Workin","Uncomplete" };

            foreach (string szColName_ in asColumn_)
                listView1.Columns.Add(szColName_, szColName_.Length*20);

            
            _m_arrListViewItemCurr = new ListViewItem[Globe.asID.Length];
            for (int i = 0; i < Globe.asID.Length; i++)
            {
                _m_arrListViewItemCurr[i] = new ListViewItem(asColumn_);
    
            }
        }


        /// <summary>
        /// 當程式結束時
        /// 停止各個執行緒
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            this.timer1.Stop();

            AEBaseThread.StopAll();
        }

    }
}

 
 
這是程式執行的狀況

左邊的樹狀圖顯示出目前程式中的執行緒

以及Queue的數量

右邊的View可以看到執行緒的工作狀況

image

 

我們可以注意到

幾乎所有的工作量都集中在下載網頁的執行緒上

所以我們決定修改一下程式

多開幾條執行緒

來分攤下載網頁的工作

 

首先在Globe中

多加入兩個ID

	/// <summary>
/// 這裡是各執行緒的ID對應列舉
/// </summary>
public enum eID : int
{ 
    DW_HTML_1=0,
    DW_HTML_2,
    DW_HTML_3,
    PARSER_HTML_1,
    STORAGE_XML_1,
    LOG
}

 

 

將新增的兩條執行緒實做出來

	/// <summary>
/// 建立各個Thread實體 
/// 並且存成陣列
/// </summary>
private void ThredInit()
{
    ArrayList alThread_ = new ArrayList(Globe.asID.Length);

    
    alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_1]));
    alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_2]));
    alThread_.Add(new DW_HTML(Globe.asID[(int)Globe.eID.DW_HTML_3]));
    alThread_.Add(new Parser_HTML(Globe.asID[(int)Globe.eID.PARSER_HTML_1]));
    alThread_.Add(new StorageXML(Globe.asID[(int)Globe.eID.STORAGE_XML_1]));
    alThread_.Add(new WriteLog(Globe.asID[(int)Globe.eID.LOG]));


    Globe.m_arrThred = (AEBaseThread[])alThread_.ToArray(typeof(AEBaseThread));


    foreach(AEBaseThread mAEBaseThread_ in Globe.m_arrThred)
    {
        mAEBaseThread_.Start();
    }
}

 

 

還有新增Queue的註冊

我這裡改用一個比較懶的方式

用迴圈來完成他

	/// <summary>
/// 向AEQueueController註冊執行緒
/// 以建立Queue
/// </summary>
private void ThreadRegister()
{ 
    //DownloadHTML to other
    string szKeyWord_ = "DW_HTML_";
    for (int i = 0; i < Globe.asID.Length; i++)
    {
        if (Globe.asID[i].Length < szKeyWord_.Length)
            continue;

        if (Globe.asID[i].Substring(0, szKeyWord_.Length) == szKeyWord_)
        {
            // To Parser
            AEQueueController.Instance.Register(
                Globe.m_arrThred[i],
                Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1]);

            // To Log
            AEQueueController.Instance.Register(
                Globe.m_arrThred[i],
                Globe.m_arrThred[(int)Globe.eID.LOG]);
        }
    }


    // 註冊從分析的執行緒到儲存的執行緒的Queue
    AEQueueController.Instance.Register(
        Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1],
        Globe.m_arrThred[(int)Globe.eID.STORAGE_XML_1]);


    // 註冊從分析的執行緒到Log的執行緒的Queue
    AEQueueController.Instance.Register(
        Globe.m_arrThred[(int)Globe.eID.PARSER_HTML_1],
        Globe.m_arrThred[(int)Globe.eID.LOG]);

}

 

 

然後修改一下下載網頁的分配方式

	/// <summary>
/// 所有東西都建構完成
/// 開始執行程式
/// </summary>
private void StartAnalytics()
{
    for (int i = 1200; i < 1220; i++)
    {
        ((DW_HTML)Globe.m_arrThred[i % 3]).Enqueue(i.ToString());
    }        
}

 

 

這樣就好了

總修改應該不超過十行

請各位自行跟上面的程式碼比對一下

 

程式執行後

我們可以看到現在有三條執行緒同時在下載網頁

左邊也能看到相對應的Queue

不過愛爾蘭這邊的網路實在太慢XD

主要的工作還是卡在下載上

image