用c#實作多執行緒之間的非同步式交換架構 Part 2

Use C# to achieve asynchronous exchange in mutithread. Part 2

接續上篇…

 

接下來就是重點了

我們建立了一個Singleton物件 AEQueueController

來統一管理所有的Queue

包括了Queue的註冊與跨執行緒間資料的傳送

	namespace AsyncExchange
{
    public sealed partial class AEQueueController
    {
        /// <summary>
        /// It's a singleton.
        /// </summary>
        private AEQueueController() { }

        
        /// <summary>
        /// Get this singleton's Instance.
        /// </summary>
        public static readonly AEQueueController Instance = new AEQueueController();


        private const int DEFAULT_SIZE = 10;


        /// <summary>
        /// A split char.
        /// </summary>
        private const string SPLIT = "|";


        /// <summary>
        /// Record all of queue by identify Name.
        /// Spend memories to save time.
        /// </summary>
        private Dictionary<string, ArrayList> hThreadSource;


        /// <summary>
        /// Record all of queue by identify Name.
        /// Spend memories to save time.
        /// </summary>
        private Dictionary<string, ArrayList> hThreadTarget;


        /// <summary>
        /// Have this pair registerd?
        /// </summary>
        private Dictionary<string, AEQueue> hRegistered;
        
        
        /// <summary>
        /// You can make a queue by register two threads.
        /// </summary>
        /// <param name="mThreadSource"></param>
        /// <param name="mThreadTarget"></param>
        /// <returns></returns>
        public bool Register(AEThread mThreadSource, AEThread mThreadTarget)
        {
            bool bResult_ = false;
            
            // init
            if( hThreadSource == null )
                hThreadSource = new Dictionary<string, ArrayList>(DEFAULT_SIZE);

            if (hThreadTarget == null)
                hThreadTarget = new Dictionary<string, ArrayList>(DEFAULT_SIZE);

            if (hRegistered == null)
                hRegistered = new Dictionary<string, AEQueue>(DEFAULT_SIZE * (DEFAULT_SIZE-1));
            
            
            // check this queue has been created
            string szKeyWord = GetKeyWord(mThreadSource.szIdentifyName, mThreadTarget.szIdentifyName);
            if (!hRegistered.ContainsKey(szKeyWord))
            {
                // Make a Queue
                AEQueue mAEQueue_ = AEQueueFactory.Make(mThreadSource, mThreadTarget);

                lock (hRegistered)
                {
                    hRegistered.Add(szKeyWord, mAEQueue_);
                }


                // store thread source
                lock (hThreadSource)
                {
                    if (!hThreadSource.ContainsKey(mThreadSource.szIdentifyName))
                        hThreadSource.Add(mThreadSource.szIdentifyName, new ArrayList(DEFAULT_SIZE-1));

                    hThreadSource[mThreadSource.szIdentifyName].Add(mAEQueue_);
                }


                // store thread target
                lock (hThreadTarget)
                {
                    if (!hThreadTarget.ContainsKey(mThreadTarget.szIdentifyName))
                        hThreadTarget.Add(mThreadTarget.szIdentifyName, new ArrayList(DEFAULT_SIZE-1));

                    hThreadTarget[mThreadTarget.szIdentifyName].Add(mAEQueue_);
                }

                bResult_ = true;
            }

            return bResult_;
        }


        /// <summary>
        /// Get queues by source thread's identify name.
        /// </summary>
        /// <param name="szIdentify"></param>
        /// <returns></returns>
        internal AEQueue[] GetQueueBySourceId(string szIdentify)
        {
            if (hThreadSource == null)
                return null;

            lock (hThreadSource)
            {
                if (!hThreadSource.ContainsKey(szIdentify))
                    return null;

                return (AEQueue[])hThreadSource[szIdentify].ToArray(typeof(AEQueue));
            }
        }


        /// <summary>
        /// Get queues by target thread's identify name.
        /// </summary>
        /// <param name="szIdentify"></param>
        /// <returns></returns>
        internal AEQueue[] GetQueueByTargetId(string szIdentify)
        {
            if (hThreadTarget == null)
                return null;

            lock (hThreadTarget)
            {
                if (!hThreadTarget.ContainsKey(szIdentify))
                    return null;

                return (AEQueue[])hThreadTarget[szIdentify].ToArray(typeof(AEQueue));
            }
        }


        /// <summary>
        /// Input a AEElement to the queue from this thread,
        /// then it will be execute in other thread.
        /// </summary>
        /// <param name="szTargetIdentifyName">Target thread's identify name</param>
        /// <param name="iCurrentThreadID">Current thread's ID</param>
        /// <param name="mAEElement">A AEElemt that you want to send</param>
        public void SendData(string szTargetIdentifyName, int iCurrentThreadID, AEElement mAEElement)
        {
            SendData(szTargetIdentifyName, AEThread.GetThreadByID(iCurrentThreadID), mAEElement);
        }


        /// <summary>
        /// Input a AEElement to the queue from this thread,
        /// then it will be execute in other thread.
        /// </summary>
        /// <param name="szTargetIdentifyName">Target thread's identify name</param>
        /// <param name="mAEThreadCurrent">Source thread</param>
        /// <param name="mAEElement">A AEElemt that you want to send</param>
        public void SendData(string szTargetIdentifyName, AEThread mAEThreadCurrent, AEElement mAEElement)
        {
            string szKeyWord_ = GetKeyWord(mAEThreadCurrent.szIdentifyName, szTargetIdentifyName);

            hRegistered[szKeyWord_].Enqueue(mAEElement, Thread.CurrentThread.ManagedThreadId);        
        }


        /// <summary>
        /// Just make a key word.
        /// </summary>
        /// <param name="szFrom"></param>
        /// <param name="szTo"></param>
        /// <returns></returns>
        private string GetKeyWord(string szFrom, string szTo)
        {
            return szFrom + SPLIT + szTo;
        }// method
    }// class
}// namespace

 

 

當兩條執行緒向AEQueueController註冊後

首先會檢查是否已經註冊過

避免重複註冊

然後交由AEQueueFactory產生一個新的Queue

基本上這個物件是由AEQueueController叫用的

一般使用者並不會使用到此物件

另外為了方便以後程式碼的擴充與修改

AEQueueFactory採用的是工廠模式

	namespace AsyncExchange
{
    /// <summary>
    /// A simple factory pattern.
    /// You can add some conditions in the future.
    /// </summary>
    internal abstract class AEQueueFactory
    {
        /// <summary>
        /// Make a queue.
        /// </summary>
        /// <param name="mFrom">Source thread</param>
        /// <param name="mTo">Target thread</param>
        /// <returns></returns>
        public static AEQueue Make(AEThread mFrom, AEThread mTo)
        {
            return new AEQueue(mFrom, mTo);
        }
    }
}

 

 

 

當經過AEQueueController之後

便會產生一個Queue

我們將這個Queue的物件命名為AEQueue

每當一個執行緒要插入一個元素

或是要取出一個元素的時候

我們會這邊驗證執行緒的ID是否正確

當然…

這個物件也是給此框架叫用的

一般使用者並不會去使用他

	namespace AsyncExchange
{
    internal class AEQueue
    {
        private Queue<AEElement> qAEElement;

        private AEThread _mFrom;

        private AEThread _mTo;

        private const int DEFAULT_ELEMENT_COUNT = 50;

        
        /// <summary>
        /// How many elements in this queue.
        /// </summary>
        public int ElementCount
        {
            get
            {
                return qAEElement.Count;            
            }        
        }

                
        /// <summary>
        /// constuct
        /// </summary>
        /// <param name="mFrom">Source thread</param>
        /// <param name="mTo">Target thread</param>
        public AEQueue(AEThread mFrom, AEThread mTo)
        {
            _mFrom = mFrom;

            _mTo = mTo;

            qAEElement = new Queue<AEElement>(DEFAULT_ELEMENT_COUNT);        
        }


        /// <summary>
        /// return a AEElement and remove it form the queue 
        /// </summary>
        /// <returns></returns>
        public AEElement Dequeue(int iThreadID)
        {
            // check thread id 
            if (_mTo.iAEThreadID != iThreadID)
                throw new Exception("Thread ID error");
            
            lock (qAEElement)
            {
                return qAEElement.Dequeue();            
            }        
        }


        /// <summary>
        /// input a AEElement to the queue
        /// </summary>
        /// <param name="mAEElements"></param>
        public void Enqueue(AEElement mAEElements, int iThreadID)
        {
            // check thread id 
            if (_mFrom.iAEThreadID != iThreadID)
                throw new Exception("Thread ID error");
            
            lock (qAEElement)
            {
                qAEElement.Enqueue(mAEElements);
            }// lock
        }// method
    }// class
}// namespace

 

 

 

最後這邊便是Queue要儲存的元素

也是執行緒間要傳遞資料的規格

我們採用一個介面的方式來完成他

當一個物件繼承了AEElement之後

將在執行緒A實作它

並且使用 AEQueueController.Instance.SendData(string szTargetIdentifyName, AEThread mAEThreadCurrent, AEElement mAEElement)

這個方法來傳送資料到正確的Queue之中

然後執行緒B將會去檢查該Queue

如果有待執行的元素

便會將他取出

然後執行成員函式Execute()

所以使用者覆寫的Execute()函式

便是要在執行緒B進行運算的資料

	namespace AsyncExchange
{
    public interface AEElement
    {
        /// <summary>
        /// It will be execute in the target thread.
        /// </summary>
        /// <param name="Params"></param>
        void Execute(ref object[] Params);
    }
}

 

 

接續下篇…