Use C# to achieve asynchronous exchange in mutithread. Part 1

Use C# to achieve asynchronous exchange in mutithread. Part 1

Developing application usually use multithread to improve performance currently.
But it’s a main problem which makes sequences when you was reading or writing.

We will achieve a multithread frame in this article.
And we will use a queue to exchange data between two threads.
They will ensure the accuracy of data and reduce the frequency of “lock” to improve performance.

 

There are several functions in the frame.

1.    Simplify the management of Queue.
You will make a queue when the thread A send data to the thread B.
And you will make another queue when the thread B send data to the thread B.
You have got 2 queues currently.
But you need 6 queues when you have got 3 threads.
You may have 12 queues if you have got 4 threads.
So we can got a rule which is n*(n-1).
They mean you can handle more things at same time.
But there will be big problem to manage your queues.
So we will simply to manage queues and avoid mistakes on the frame.

 

2.    Make sure data will be handled on exact thread.
It will be checked the thread ID before it’s handled.
It will throw an exception if the ID is mistake.

 

3.    Reduce the CPU waste by sleeping mode.
The final function is an interesting function.
It will switch to sleeping mode automatic when the thread has had no work.

 

 

 

 

There are simple codes.

We created an abstract class which is named “AEThread”.
It will be a thread which works independently when you have inherited.
You can put any routine works which will be handled by this thread if you need.
It will check whole queues which registered under it and do it.

namespace AsyncExchange
{
    public abstract partial class AEThread
    {
        /// <summary>
        /// You can find a AEThread by indentify name.
        /// </summary>
        private static Dictionary<string, AEThread> _hlThreadObj;

        
        /// <summary>
        /// You can find a AEThread by thread ID.
        /// </summary>
        private static Dictionary<int, AEThread> _hlThreadID;


        /// <summary>
        /// This AEThread's identify name.
        /// </summary>
        private readonly string _szIdentifyName;


        /// <summary>
        /// This AEThread's identify name.
        /// </summary>
        public string szIdentifyName { get { return _szIdentifyName; } }


        /// <summary>
        /// It's just a sequence.
        /// </summary>
        private readonly int _iObjectID;

        
        /// <summary>
        /// This thread's work cycle.
        /// </summary>
        private readonly int _iWorkCycle;

        
        /// <summary>
        /// This thread's sleep cycle.
        /// </summary>
        private readonly int _iSleepCycle;


        /// <summary>
        /// In fact thread which work
        /// </summary>
        private Thread _threadCurr;


        /// <summary>
        /// Current thread state.
        /// </summary>
        private eAEThreadState _CurrentState;


        /// <summary>
        /// Current thread state.
        /// </summary>
        public eAEThreadState CurrentState { get { return _CurrentState; } }
        

        /// <summary>
        /// Is it working?
        /// </summary>
        private volatile bool _IsWork;
        
        
        /// <summary>
        /// Is it working?
        /// </summary>
        public bool isWork { get { return _IsWork; } }


        /// <summary>
        /// Current thread id.
        /// </summary>
        public int iAEThreadID{    get { return _threadCurr.ManagedThreadId; }}


        /// <summary>
        /// AEQueue instance buffer.
        /// </summary>
        private AEQueue[] _m_arrAEQueue;


        /// <summary>
        /// AEElement instance buffer.
        /// </summary>
        private AEElement _mAEElement;


        /// <summary>
        /// buffer of params.
        /// </summary>
        private object[] _objParams;
        
        
        /// <summary>
        /// AEThread's construct.
        /// </summary>
        /// <param name="szidentifyName">Identify name in this AEThread</param>
        /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param>
        public AEThread(string szidentifyName, int iWorkCycle) : this(szidentifyName, iWorkCycle, iWorkCycle) { }
                
        
        /// <summary>
        /// AEThread's construct.
        /// </summary>
        /// <param name="szidentifyName">Identify name in this AEThread</param>
        /// <param name="iWorkCycle">How long do you want it to qurey the queue, when it is working</param>
        /// <param name="iSleepCycle">How long do you want it to qurey the queue, when it is sleeping</param>
        public AEThread(string szidentifyName, int iWorkCycle, int iSleepCycle)
        {
            _CurrentState = eAEThreadState.NONE;

            _szIdentifyName = szidentifyName;
            
            _iSleepCycle = iSleepCycle;

            _iWorkCycle = iWorkCycle;


            if (_hlThreadObj == null)
            {
                _hlThreadObj = new Dictionary<string,AEThread>(10);
                _iObjectID = 0;
            }
            else
            {
                _iObjectID = _hlThreadObj.Count;
            }


            _threadCurr = new Thread(new ThreadStart(MainProcess));


            if (_hlThreadObj.ContainsKey(_szIdentifyName))
                throw new Exception("IdentifyName repeated"); 

            
            _hlThreadObj.Add(_szIdentifyName, this);



            if (_hlThreadID == null)
                _hlThreadID = new Dictionary<int, AEThread>(10);


            if (_hlThreadID.ContainsKey(_threadCurr.ManagedThreadId))
                throw new Exception("ThreadID repeated");


            _hlThreadID.Add(_threadCurr.ManagedThreadId, this);

            
            _CurrentState = eAEThreadState.STANDBY;
        }


        /// <summary>
        /// Main process
        /// </summary>
        private void MainProcess()
        {
            while (true)
            {
                // all are false that represents the correct
                if (!Run() && !ExecuteQueue())
                {
                    _IsWork = false;

                    Thread.Sleep(_iSleepCycle);
                }
                else
                {
                    _IsWork = true;

                    Thread.Sleep(_iWorkCycle);
                }
            }
        }


        /// <summary>
        /// It will get a AEElement to execute from every AEQueue.
        /// </summary>
        /// <returns></returns>
        private bool ExecuteQueue()
        {
            _m_arrAEQueue = AEQueueController.Instance.GetQueueByTargetId(this.szIdentifyName);

            if(_m_arrAEQueue==null)
                return false;

            bool bResult_ = false;

            
            foreach (AEQueue mAEQueue_ in _m_arrAEQueue)
            {
                if (mAEQueue_.ElementCount != 0)
                {
                    _mAEElement = mAEQueue_.Dequeue(Thread.CurrentThread.ManagedThreadId);

                    _mAEElement.Execute(ref _objParams);

                    Complete(_objParams);

                    bResult_ = true;
                }            
            }

            return bResult_;
        }


        /// <summary>
        /// It will be called once when a AEElement be executed to finish.
        /// </summary>
        /// <param name="Params"></param>
        public virtual void Complete(object[] Params){ }


        /// <summary>
        /// Start this thread.
        /// </summary>
        /// <returns></returns>
        public bool Start()
        {
            if (_CurrentState != eAEThreadState.STANDBY)
                return false;
            
            
            _threadCurr.Start();
            _CurrentState = eAEThreadState.RUNNING;
            return true;
        }


        /// <summary>
        /// Pause this thread.
        /// </summary>
        /// <returns></returns>
        public bool Pause()
        {
            try
            {
                _threadCurr.Suspend();
                _CurrentState = eAEThreadState.PAUSE;
            }
            catch{}

            if (_CurrentState != eAEThreadState.PAUSE)
                return false;
            else
                return true;
        }

        /// <summary>
        /// Resume this thread,
        /// if it's pausing.
        /// </summary>
        /// <returns></returns>
        public bool Resume()
        {
            try
            {
                _threadCurr.Resume();
                _CurrentState = eAEThreadState.RUNNING;
            }
            catch { }

            if (_CurrentState != eAEThreadState.RUNNING)
                return false;
            else
                return true;
        }


        /// <summary>
        /// You can put some routine works here.
        /// </summary>
        /// <returns>
        /// return false that will be into SleepCycle
        /// return true still use WorkCycle
        /// </returns>
        protected virtual bool Run()
        {
            return false;
        }
        

        
        /// <summary>
        /// It will stop all AEThread
        /// </summary>
        public static void StopAll()
        {

            ArrayList alThreadObj_ = new ArrayList(_hlThreadObj.Count);
            
            alThreadObj_.AddRange(_hlThreadObj.Keys);
            
            AEThread mAEThread_;
            string szIdentifyName_;

            // all stop
            for (int i = 0; i < alThreadObj_.Count; i++)
            {
                szIdentifyName_ = alThreadObj_[i].ToString();

                mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_];
                mAEThread_._CurrentState = eAEThreadState.CLOSING;
                mAEThread_._threadCurr.Abort();                
            }

            // wait to all stop
            for (int i = 0; i < alThreadObj_.Count; i++)
            {
                szIdentifyName_ = alThreadObj_[i].ToString();

                mAEThread_ = (AEThread)_hlThreadObj[szIdentifyName_];
                mAEThread_._threadCurr.Join();
                mAEThread_._CurrentState = eAEThreadState.CLOSED;
            }
        }


        /// <summary>
        /// Find a thread by thread ID.
        /// </summary>
        /// <param name="iID"></param>
        /// <returns></returns>
        public static AEThread GetThreadByID(int iID)
        {
            if (!_hlThreadID.ContainsKey(iID))
                return null;

            return _hlThreadID[iID];
        }


        /// <summary>
        /// Find a thread by indentify name.
        /// </summary>
        /// <param name="szIdentifyName"></param>
        /// <returns></returns>
        public static AEThread GetThreadByIdentifyName(string szIdentifyName)
        {
            if (!_hlThreadObj.ContainsKey(szIdentifyName))
                return null;

            return _hlThreadObj[szIdentifyName];
        }


        /// <summary>
        /// The total number of uncomplete elements.
        /// </summary>
        public int TotalUncompleteElement
        {
            get
            {
                AEQueue[] m_arrAEQueue_ = AEQueueController.Instance.GetQueueByTargetId(this._szIdentifyName);

                if (m_arrAEQueue_ == null)
                    return 0;
                
                int iTotal = 0;
                foreach(AEQueue mAEQueue_ in m_arrAEQueue_)
                    iTotal += mAEQueue_.ElementCount;

                return iTotal;
            }
        }// method
    }// class

    
    public enum eAEThreadState
    { 
        NONE=0,
        STANDBY,
        RUNNING,
        PAUSE,
        CLOSING,
        CLOSED
    }// enum

}// namespace

 

to be continue…