用c#實作多執行緒之間的非同步式交換架構 Part 1

Use C# to achieve asynchronous exchange in mutithread. Part 1

 

現在的程式為了可以提升效能

多半會採用多執行緒的做法

但是在這樣的狀況下

資料讀寫的順序與正確性變成了一個主要的問題

在這篇文章中

我們實做一個多執行緒的框架

並且在每個執行緒間

使用Queue來進行資料的傳遞

以確保資料順序的正確性

還有減少因為使用太多的Lock造成運算上的延遲

 

 

 

此框架有以下的功能

 

1. 簡化Queue的管理

當A執行緒要傳遞資料到B執行緒的時候

就必須建立一個Queue

而當B執行緒也要傳遞資料給A執行緒的時候

又必須建立一個Queue

所以當你有兩個執行緒在工作的時候

你最多需要兩個Queue來管理你的資料

而當你有三條執行緒的是時候

你最多需要六個Queue

但是當你有四個執行緒的時候

你可能就需要十二個Queue了

我們可以依此推斷出一個公式 n*(n-1)

當你的程式執行緒越來愈多

代表可以同時處理越多的事情

可是Queue的管理便成了一個大問題

所以我們利用這個框架來簡化Queue的管理

避免資料傳遞時出錯

 

 

 

2. 確保資料在正確的執行緒上被運算

每次執行前會先檢查Thread ID

如果ID錯誤會拋出例外

 

 

 

3. 使用休眠模式節省CPU負擔

最後還有一個有趣的功能

就是當某執行緒已經沒有工作的時候

自動進入休眠的狀態

減少CPU的負擔

 

 

以下為程式碼範例

每個成員涵式的用途

均已經註解在程式碼中

 

首先我們建立一個抽象物件 AEThread

繼承之後實作便是一條獨立工作的執行緒

任何需要這個執行緒執行的例行工作都可以放在這個裡頭

並且會定時檢查註冊在這個執行緒下的Queue然後執行他

	namespace AsyncExchange
{
    public abstract partial class AEThread
    {
        /// <summary>
        /// You can find a AEThread by indentify name.
        /// </summary>
        private static Dictionary<string, AEThread> _hlThreadObj;

        
        /// <summary>
        /// You can find a AEThread by thread ID.
        /// </summary>
        private static Dictionary<int, AEThread> _hlThreadID;


        /// <summary>
        /// This AEThread's identify name.
        /// </summary>
        private readonly string _szIdentifyName;


        /// <summary>
        /// This AEThread's identify name.
        /// </summary>
        public string szIdentifyName { get { return _szIdentifyName; } }


        /// <summary>
        /// It's just a sequence.
        /// </summary>
        private readonly int _iObjectID;

        
        /// <summary>
        /// This thread's work cycle.
        /// </summary>
        private readonly int _iWorkCycle;

        
        /// <summary>
        /// This thread's sleep cycle.
        /// </summary>
        private readonly int _iSleepCycle;


        /// <summary>
        /// In fact thread which work
        /// </summary>
        private Thread _threadCurr;


        /// <summary>
        /// Current thread state.
        /// </summary>
        private eAEThreadState _CurrentState;


        /// <summary>
        /// Current thread state.
        /// </summary>
        public eAEThreadState CurrentState { get { return _CurrentState; } }
        

        /// <summary>
        /// Is it working?
        /// </summary>
        private volatile bool _IsWork;
        
        
        /// <summary>
        /// Is it working?
        /// </summary>
        public bool isWork { get { return _IsWork; } }


        /// <summary>
        /// Current thread id.
        /// </summary>
        public int iAEThreadID{    get { return _threadCurr.ManagedThreadId; }}


        /// <summary>
        /// AEQueue instance buffer.
        /// </summary>
        private AEQueue[] _m_arrAEQueue;


        /// <summary>
        /// AEElement instance buffer.
        /// </summary>
        private AEElement _mAEElement;


        /// <summary>
        /// buffer of params.
        /// </summary>
        private object[] _objParams;
        
        
        /// <summary>
        /// AEThread's construct.
        /// </summary>
        /// <param name="szidentifyName">Identify name in this AEThread</param>
        /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param>
        public AEThread(string szidentifyName, int iWorkCycle) : this(szidentifyName, iWorkCycle, iWorkCycle) { }
                
        
        /// <summary>
        /// AEThread's construct.
        /// </summary>
        /// <param name="szidentifyName">Identify name in this AEThread</param>
        /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param>
        /// <param name="iSleepCycle">How long do you want it to qurey the queue, when it is sleeping</param>
        public AEThread(string szidentifyName, int iWorkCycle, int iSleepCycle)
        {
            _CurrentState = eAEThreadState.NONE;

            _szIdentifyName = szidentifyName;
            
            _iSleepCycle = iSleepCycle;

            _iWorkCycle = iWorkCycle;


            if (_hlThreadObj == null)
            {
                _hlThreadObj = new Dictionary<string,AEThread>(10);
                _iObjectID = 0;
            }
            else
            {
                _iObjectID = _hlThreadObj.Count;
            }


            _threadCurr = new Thread(new ThreadStart(MainProcess));


            if (_hlThreadObj.ContainsKey(_szIdentifyName))
                throw new Exception("IdentifyName repeated"); 

            
            _hlThreadObj.Add(_szIdentifyName, this);



            if (_hlThreadID == null)
                _hlThreadID = new Dictionary<int, AEThread>(10);


            if (_hlThreadID.ContainsKey(_threadCurr.ManagedThreadId))
                throw new Exception("ThreadID repeated");


            _hlThreadID.Add(_threadCurr.ManagedThreadId, this);

            
            _CurrentState = eAEThreadState.STANDBY;
        }


        /// <summary>
        /// Main process
        /// </summary>
        private void MainProcess()
        {
            while (true)
            {
                // all are false that represents the correct
                if (!Run() && !ExecuteQueue())
                {
                    _IsWork = false;

                    Thread.Sleep(_iSleepCycle);
                }
                else
                {
                    _IsWork = true;

                    Thread.Sleep(_iWorkCycle);
                }
            }
        }


        /// <summary>
        /// It will get a AEElement to execute from every AEQueue.
        /// </summary>
        /// <returns></returns>
        private bool ExecuteQueue()
        {
            _m_arrAEQueue = AEQueueController.Instance.GetQueueByTargetId(this.szIdentifyName);

            if(_m_arrAEQueue==null)
                return false;

            bool bResult_ = false;

            
            foreach (AEQueue mAEQueue_ in _m_arrAEQueue)
            {
                if (mAEQueue_.ElementCount != 0)
                {
                    _mAEElement = mAEQueue_.Dequeue(Thread.CurrentThread.ManagedThreadId);

                    _mAEElement.Execute(ref _objParams);

                    Complete(_objParams);

                    bResult_ = true;
                }            
            }

            return bResult_;
        }


        /// <summary>
        /// It will be called once when a AEElement be executed to finish.
        /// </summary>
        /// <param name="Params"></param>
        public virtual void Complete(object[] Params){ }


        /// <summary>
        /// Start this thread.
        /// </summary>
        /// <returns></returns>
        public bool Start()
        {
            if (_CurrentState != eAEThreadState.STANDBY)
                return false;
            
            
            _threadCurr.Start();
            _CurrentState = eAEThreadState.RUNNING;
            return true;
        }


        /// <summary>
        /// Pause this thread.
        /// </summary>
        /// <returns></returns>
        public bool Pause()
        {
            try
            {
                _threadCurr.Suspend();
                _CurrentState = eAEThreadState.PAUSE;
            }
            catch{}

            if (_CurrentState != eAEThreadState.PAUSE)
                return false;
            else
                return true;
        }

        /// <summary>
        /// Resume this thread,
        /// if it's pausing.
        /// </summary>
        /// <returns></returns>
        public bool Resume()
        {
            try
            {
                _threadCurr.Resume();
                _CurrentState = eAEThreadState.RUNNING;
            }
            catch { }

            if (_CurrentState != eAEThreadState.RUNNING)
                return false;
            else
                return true;
        }


        /// <summary>
        /// You can put some routine works here.
        /// </summary>
        /// <returns>
        /// return false that will be into SleepCycle
        /// return true still use WorkCycle
        /// </returns>
        protected virtual bool Run()
        {
            return false;
        }
        

        
        /// <summary>
        /// It will stop all AEThread
        /// </summary>
        public static void StopAll()
        {

            ArrayList alThreadObj_ = new ArrayList(_hlThreadObj.Count);
            
            alThreadObj_.AddRange(_hlThreadObj.Keys);
            
            AEThread mAEThread_;
            string szIdentifyName_;

            // all stop
            for (int i = 0; i < alThreadObj_.Count; i++)
            {
                szIdentifyName_ = alThreadObj_[i].ToString();

                mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_];
                mAEThread_._CurrentState = eAEThreadState.CLOSING;
                mAEThread_._threadCurr.Abort();                
            }

            // wait to all stop
            for (int i = 0; i < alThreadObj_.Count; i++)
            {
                szIdentifyName_ = alThreadObj_[i].ToString();

                mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_];
                mAEThread_._threadCurr.Join();
                mAEThread_._CurrentState = eAEThreadState.CLOSED;
            }
        }


        /// <summary>
        /// Find a thread by thread ID.
        /// </summary>
        /// <param name="iID"></param>
        /// <returns></returns>
        public static AEThread GetThreadByID(int iID)
        {
            if (!_hlThreadID.ContainsKey(iID))
                return null;

            return _hlThreadID[iID];
        }


        /// <summary>
        /// Find a thread by indentify name.
        /// </summary>
        /// <param name="szIdentifyName"></param>
        /// <returns></returns>
        public static AEThread GetThreadByIdentifyName(string szIdentifyName)
        {
            if (!_hlThreadObj.ContainsKey(szIdentifyName))
                return null;

            return _hlThreadObj[szIdentifyName];
        }


        /// <summary>
        /// The total number of uncomplete elements.
        /// </summary>
        public int TotalUncompleteElement
        {
            get
            {
                AEQueue[] m_arrAEQueue_ = AEQueueController.Instance.GetQueueByTargetId(this._szIdentifyName);

                if (m_arrAEQueue_ == null)
                    return 0;
                
                int iTotal = 0;
                foreach(AEQueue mAEQueue_ in m_arrAEQueue_)
                    iTotal += mAEQueue_.ElementCount;

                return iTotal;
            }
        }// method
    }// class

    
    public enum eAEThreadState
    { 
        NONE=0,
        STANDBY,
        RUNNING,
        PAUSE,
        CLOSING,
        CLOSED
    }// enum

}// namespace

 

 

接續下篇…