Use C# to achieve asynchronous exchange in mutithread. Part 2
接續上篇…
接下來就是重點了
我們建立了一個Singleton物件 AEQueueController
來統一管理所有的Queue
包括了Queue的註冊與跨執行緒間資料的傳送
namespace AsyncExchange { public sealed partial class AEQueueController { /// <summary> /// It's a singleton. /// </summary> private AEQueueController() { } /// <summary> /// Get this singleton's Instance. /// </summary> public static readonly AEQueueController Instance = new AEQueueController(); private const int DEFAULT_SIZE = 10; /// <summary> /// A split char. /// </summary> private const string SPLIT = "|"; /// <summary> /// Record all of queue by identify Name. /// Spend memories to save time. /// </summary> private Dictionary<string, ArrayList> hThreadSource; /// <summary> /// Record all of queue by identify Name. /// Spend memories to save time. /// </summary> private Dictionary<string, ArrayList> hThreadTarget; /// <summary> /// Have this pair registerd? /// </summary> private Dictionary<string, AEQueue> hRegistered; /// <summary> /// You can make a queue by register two threads. /// </summary> /// <param name="mThreadSource"></param> /// <param name="mThreadTarget"></param> /// <returns></returns> public bool Register(AEThread mThreadSource, AEThread mThreadTarget) { bool bResult_ = false; // init if( hThreadSource == null ) hThreadSource = new Dictionary<string, ArrayList>(DEFAULT_SIZE); if (hThreadTarget == null) hThreadTarget = new Dictionary<string, ArrayList>(DEFAULT_SIZE); if (hRegistered == null) hRegistered = new Dictionary<string, AEQueue>(DEFAULT_SIZE * (DEFAULT_SIZE-1)); // check this queue has been created string szKeyWord = GetKeyWord(mThreadSource.szIdentifyName, mThreadTarget.szIdentifyName); if (!hRegistered.ContainsKey(szKeyWord)) { // Make a Queue AEQueue mAEQueue_ = AEQueueFactory.Make(mThreadSource, mThreadTarget); lock (hRegistered) { hRegistered.Add(szKeyWord, mAEQueue_); } // store thread source lock (hThreadSource) { if (!hThreadSource.ContainsKey(mThreadSource.szIdentifyName)) hThreadSource.Add(mThreadSource.szIdentifyName, new ArrayList(DEFAULT_SIZE-1)); hThreadSource[mThreadSource.szIdentifyName].Add(mAEQueue_); } // store thread target lock (hThreadTarget) { if (!hThreadTarget.ContainsKey(mThreadTarget.szIdentifyName)) hThreadTarget.Add(mThreadTarget.szIdentifyName, new ArrayList(DEFAULT_SIZE-1)); hThreadTarget[mThreadTarget.szIdentifyName].Add(mAEQueue_); } bResult_ = true; } return bResult_; } /// <summary> /// Get queues by source thread's identify name. /// </summary> /// <param name="szIdentify"></param> /// <returns></returns> internal AEQueue[] GetQueueBySourceId(string szIdentify) { if (hThreadSource == null) return null; lock (hThreadSource) { if (!hThreadSource.ContainsKey(szIdentify)) return null; return (AEQueue[])hThreadSource[szIdentify].ToArray(typeof(AEQueue)); } } /// <summary> /// Get queues by target thread's identify name. /// </summary> /// <param name="szIdentify"></param> /// <returns></returns> internal AEQueue[] GetQueueByTargetId(string szIdentify) { if (hThreadTarget == null) return null; lock (hThreadTarget) { if (!hThreadTarget.ContainsKey(szIdentify)) return null; return (AEQueue[])hThreadTarget[szIdentify].ToArray(typeof(AEQueue)); } } /// <summary> /// Input a AEElement to the queue from this thread, /// then it will be execute in other thread. /// </summary> /// <param name="szTargetIdentifyName">Target thread's identify name</param> /// <param name="iCurrentThreadID">Current thread's ID</param> /// <param name="mAEElement">A AEElemt that you want to send</param> public void SendData(string szTargetIdentifyName, int iCurrentThreadID, AEElement mAEElement) { SendData(szTargetIdentifyName, AEThread.GetThreadByID(iCurrentThreadID), mAEElement); } /// <summary> /// Input a AEElement to the queue from this thread, /// then it will be execute in other thread. /// </summary> /// <param name="szTargetIdentifyName">Target thread's identify name</param> /// <param name="mAEThreadCurrent">Source thread</param> /// <param name="mAEElement">A AEElemt that you want to send</param> public void SendData(string szTargetIdentifyName, AEThread mAEThreadCurrent, AEElement mAEElement) { string szKeyWord_ = GetKeyWord(mAEThreadCurrent.szIdentifyName, szTargetIdentifyName); hRegistered[szKeyWord_].Enqueue(mAEElement, Thread.CurrentThread.ManagedThreadId); } /// <summary> /// Just make a key word. /// </summary> /// <param name="szFrom"></param> /// <param name="szTo"></param> /// <returns></returns> private string GetKeyWord(string szFrom, string szTo) { return szFrom + SPLIT + szTo; }// method }// class }// namespace
當兩條執行緒向AEQueueController註冊後
首先會檢查是否已經註冊過
避免重複註冊
然後交由AEQueueFactory產生一個新的Queue
基本上這個物件是由AEQueueController叫用的
一般使用者並不會使用到此物件
另外為了方便以後程式碼的擴充與修改
AEQueueFactory採用的是工廠模式
namespace AsyncExchange { /// <summary> /// A simple factory pattern. /// You can add some conditions in the future. /// </summary> internal abstract class AEQueueFactory { /// <summary> /// Make a queue. /// </summary> /// <param name="mFrom">Source thread</param> /// <param name="mTo">Target thread</param> /// <returns></returns> public static AEQueue Make(AEThread mFrom, AEThread mTo) { return new AEQueue(mFrom, mTo); } } }
當經過AEQueueController之後
便會產生一個Queue
我們將這個Queue的物件命名為AEQueue
每當一個執行緒要插入一個元素
或是要取出一個元素的時候
我們會這邊驗證執行緒的ID是否正確
當然…
這個物件也是給此框架叫用的
一般使用者並不會去使用他
namespace AsyncExchange { internal class AEQueue { private Queue<AEElement> qAEElement; private AEThread _mFrom; private AEThread _mTo; private const int DEFAULT_ELEMENT_COUNT = 50; /// <summary> /// How many elements in this queue. /// </summary> public int ElementCount { get { return qAEElement.Count; } } /// <summary> /// constuct /// </summary> /// <param name="mFrom">Source thread</param> /// <param name="mTo">Target thread</param> public AEQueue(AEThread mFrom, AEThread mTo) { _mFrom = mFrom; _mTo = mTo; qAEElement = new Queue<AEElement>(DEFAULT_ELEMENT_COUNT); } /// <summary> /// return a AEElement and remove it form the queue /// </summary> /// <returns></returns> public AEElement Dequeue(int iThreadID) { // check thread id if (_mTo.iAEThreadID != iThreadID) throw new Exception("Thread ID error"); lock (qAEElement) { return qAEElement.Dequeue(); } } /// <summary> /// input a AEElement to the queue /// </summary> /// <param name="mAEElements"></param> public void Enqueue(AEElement mAEElements, int iThreadID) { // check thread id if (_mFrom.iAEThreadID != iThreadID) throw new Exception("Thread ID error"); lock (qAEElement) { qAEElement.Enqueue(mAEElements); }// lock }// method }// class }// namespace
最後這邊便是Queue要儲存的元素
也是執行緒間要傳遞資料的規格
我們採用一個介面的方式來完成他
當一個物件繼承了AEElement之後
將在執行緒A實作它
並且使用 AEQueueController.Instance.SendData(string szTargetIdentifyName, AEThread mAEThreadCurrent, AEElement mAEElement)
這個方法來傳送資料到正確的Queue之中
然後執行緒B將會去檢查該Queue
如果有待執行的元素
便會將他取出
然後執行成員函式Execute()
所以使用者覆寫的Execute()函式
便是要在執行緒B進行運算的資料
namespace AsyncExchange { public interface AEElement { /// <summary> /// It will be execute in the target thread. /// </summary> /// <param name="Params"></param> void Execute(ref object[] Params); } }
接續下篇…