Use C# to achieve asynchronous exchange in mutithread. Part 1
現在的程式為了可以提升效能
多半會採用多執行緒的做法
但是在這樣的狀況下
資料讀寫的順序與正確性變成了一個主要的問題
在這篇文章中
我們實做一個多執行緒的框架
並且在每個執行緒間
使用Queue來進行資料的傳遞
以確保資料順序的正確性
還有減少因為使用太多的Lock造成運算上的延遲
此框架有以下的功能
1. 簡化Queue的管理
當A執行緒要傳遞資料到B執行緒的時候
就必須建立一個Queue
而當B執行緒也要傳遞資料給A執行緒的時候
又必須建立一個Queue
所以當你有兩個執行緒在工作的時候
你最多需要兩個Queue來管理你的資料
而當你有三條執行緒的是時候
你最多需要六個Queue
但是當你有四個執行緒的時候
你可能就需要十二個Queue了
我們可以依此推斷出一個公式 n*(n-1)
當你的程式執行緒越來愈多
代表可以同時處理越多的事情
可是Queue的管理便成了一個大問題
所以我們利用這個框架來簡化Queue的管理
避免資料傳遞時出錯
2. 確保資料在正確的執行緒上被運算
每次執行前會先檢查Thread ID
如果ID錯誤會拋出例外
3. 使用休眠模式節省CPU負擔
最後還有一個有趣的功能
就是當某執行緒已經沒有工作的時候
自動進入休眠的狀態
減少CPU的負擔
以下為程式碼範例
每個成員涵式的用途
均已經註解在程式碼中
首先我們建立一個抽象物件 AEThread
繼承之後實作便是一條獨立工作的執行緒
任何需要這個執行緒執行的例行工作都可以放在這個裡頭
並且會定時檢查註冊在這個執行緒下的Queue然後執行他
namespace AsyncExchange { public abstract partial class AEThread { /// <summary> /// You can find a AEThread by indentify name. /// </summary> private static Dictionary<string, AEThread> _hlThreadObj; /// <summary> /// You can find a AEThread by thread ID. /// </summary> private static Dictionary<int, AEThread> _hlThreadID; /// <summary> /// This AEThread's identify name. /// </summary> private readonly string _szIdentifyName; /// <summary> /// This AEThread's identify name. /// </summary> public string szIdentifyName { get { return _szIdentifyName; } } /// <summary> /// It's just a sequence. /// </summary> private readonly int _iObjectID; /// <summary> /// This thread's work cycle. /// </summary> private readonly int _iWorkCycle; /// <summary> /// This thread's sleep cycle. /// </summary> private readonly int _iSleepCycle; /// <summary> /// In fact thread which work /// </summary> private Thread _threadCurr; /// <summary> /// Current thread state. /// </summary> private eAEThreadState _CurrentState; /// <summary> /// Current thread state. /// </summary> public eAEThreadState CurrentState { get { return _CurrentState; } } /// <summary> /// Is it working? /// </summary> private volatile bool _IsWork; /// <summary> /// Is it working? /// </summary> public bool isWork { get { return _IsWork; } } /// <summary> /// Current thread id. /// </summary> public int iAEThreadID{ get { return _threadCurr.ManagedThreadId; }} /// <summary> /// AEQueue instance buffer. /// </summary> private AEQueue[] _m_arrAEQueue; /// <summary> /// AEElement instance buffer. /// </summary> private AEElement _mAEElement; /// <summary> /// buffer of params. /// </summary> private object[] _objParams; /// <summary> /// AEThread's construct. /// </summary> /// <param name="szidentifyName">Identify name in this AEThread</param> /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param> public AEThread(string szidentifyName, int iWorkCycle) : this(szidentifyName, iWorkCycle, iWorkCycle) { } /// <summary> /// AEThread's construct. /// </summary> /// <param name="szidentifyName">Identify name in this AEThread</param> /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param> /// <param name="iSleepCycle">How long do you want it to qurey the queue, when it is sleeping</param> public AEThread(string szidentifyName, int iWorkCycle, int iSleepCycle) { _CurrentState = eAEThreadState.NONE; _szIdentifyName = szidentifyName; _iSleepCycle = iSleepCycle; _iWorkCycle = iWorkCycle; if (_hlThreadObj == null) { _hlThreadObj = new Dictionary<string,AEThread>(10); _iObjectID = 0; } else { _iObjectID = _hlThreadObj.Count; } _threadCurr = new Thread(new ThreadStart(MainProcess)); if (_hlThreadObj.ContainsKey(_szIdentifyName)) throw new Exception("IdentifyName repeated"); _hlThreadObj.Add(_szIdentifyName, this); if (_hlThreadID == null) _hlThreadID = new Dictionary<int, AEThread>(10); if (_hlThreadID.ContainsKey(_threadCurr.ManagedThreadId)) throw new Exception("ThreadID repeated"); _hlThreadID.Add(_threadCurr.ManagedThreadId, this); _CurrentState = eAEThreadState.STANDBY; } /// <summary> /// Main process /// </summary> private void MainProcess() { while (true) { // all are false that represents the correct if (!Run() && !ExecuteQueue()) { _IsWork = false; Thread.Sleep(_iSleepCycle); } else { _IsWork = true; Thread.Sleep(_iWorkCycle); } } } /// <summary> /// It will get a AEElement to execute from every AEQueue. /// </summary> /// <returns></returns> private bool ExecuteQueue() { _m_arrAEQueue = AEQueueController.Instance.GetQueueByTargetId(this.szIdentifyName); if(_m_arrAEQueue==null) return false; bool bResult_ = false; foreach (AEQueue mAEQueue_ in _m_arrAEQueue) { if (mAEQueue_.ElementCount != 0) { _mAEElement = mAEQueue_.Dequeue(Thread.CurrentThread.ManagedThreadId); _mAEElement.Execute(ref _objParams); Complete(_objParams); bResult_ = true; } } return bResult_; } /// <summary> /// It will be called once when a AEElement be executed to finish. /// </summary> /// <param name="Params"></param> public virtual void Complete(object[] Params){ } /// <summary> /// Start this thread. /// </summary> /// <returns></returns> public bool Start() { if (_CurrentState != eAEThreadState.STANDBY) return false; _threadCurr.Start(); _CurrentState = eAEThreadState.RUNNING; return true; } /// <summary> /// Pause this thread. /// </summary> /// <returns></returns> public bool Pause() { try { _threadCurr.Suspend(); _CurrentState = eAEThreadState.PAUSE; } catch{} if (_CurrentState != eAEThreadState.PAUSE) return false; else return true; } /// <summary> /// Resume this thread, /// if it's pausing. /// </summary> /// <returns></returns> public bool Resume() { try { _threadCurr.Resume(); _CurrentState = eAEThreadState.RUNNING; } catch { } if (_CurrentState != eAEThreadState.RUNNING) return false; else return true; } /// <summary> /// You can put some routine works here. /// </summary> /// <returns> /// return false that will be into SleepCycle /// return true still use WorkCycle /// </returns> protected virtual bool Run() { return false; } /// <summary> /// It will stop all AEThread /// </summary> public static void StopAll() { ArrayList alThreadObj_ = new ArrayList(_hlThreadObj.Count); alThreadObj_.AddRange(_hlThreadObj.Keys); AEThread mAEThread_; string szIdentifyName_; // all stop for (int i = 0; i < alThreadObj_.Count; i++) { szIdentifyName_ = alThreadObj_[i].ToString(); mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_]; mAEThread_._CurrentState = eAEThreadState.CLOSING; mAEThread_._threadCurr.Abort(); } // wait to all stop for (int i = 0; i < alThreadObj_.Count; i++) { szIdentifyName_ = alThreadObj_[i].ToString(); mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_]; mAEThread_._threadCurr.Join(); mAEThread_._CurrentState = eAEThreadState.CLOSED; } } /// <summary> /// Find a thread by thread ID. /// </summary> /// <param name="iID"></param> /// <returns></returns> public static AEThread GetThreadByID(int iID) { if (!_hlThreadID.ContainsKey(iID)) return null; return _hlThreadID[iID]; } /// <summary> /// Find a thread by indentify name. /// </summary> /// <param name="szIdentifyName"></param> /// <returns></returns> public static AEThread GetThreadByIdentifyName(string szIdentifyName) { if (!_hlThreadObj.ContainsKey(szIdentifyName)) return null; return _hlThreadObj[szIdentifyName]; } /// <summary> /// The total number of uncomplete elements. /// </summary> public int TotalUncompleteElement { get { AEQueue[] m_arrAEQueue_ = AEQueueController.Instance.GetQueueByTargetId(this._szIdentifyName); if (m_arrAEQueue_ == null) return 0; int iTotal = 0; foreach(AEQueue mAEQueue_ in m_arrAEQueue_) iTotal += mAEQueue_.ElementCount; return iTotal; } }// method }// class public enum eAEThreadState { NONE=0, STANDBY, RUNNING, PAUSE, CLOSING, CLOSED }// enum }// namespace
接續下篇…