ManualResetEvent_多MRE+ ThreadPool重複呼叫API

利用ManualResetEvent來控制子執行緒進行同步作業(以利用MultiThread重複打API為範例)

using FETC.eTag.BMS.BMSWeb.utility;
using FETC.eTag.BMS.DAL;
using log4net;
using log4net.Config;
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;

using System.Collections;
using System.Collections.Generic;
using System.Threading;
using RestSharp;
using System.Net;
using Newtonsoft.Json;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;


namespace APIHelper
{
    class Program
    {
        /*
         * 作業名稱:{APIHelper}
         */
        #region 用於設定只能開啟一個執行程式,不可多開
        [DllImport("User32.dll")]
        private static extern bool ShowWindowAsync(IntPtr hWnd, int cmdShow);
        [DllImport("User32.dll")]
        private static extern bool SetForegroundWindow(IntPtr hWnd);
        #endregion

        #region 全域變數
        private static readonly ILog KLog = LogManager.GetLogger(typeof(Program));//使用log4net的Log
        //static KLog log = null;//使用BMSWeb的Log
        static UserInfo info = null;
        private static List<ManualResetEvent> _waitHandles = null;
        private static Dictionary<string, string> dicResult = null;
        #endregion

        static void Main(string[] args)
        {
            bool isSuccess = false;
            try
            {
                #region Log初始化
                //使用log4net的Log
                string log4net = System.AppDomain.CurrentDomain.BaseDirectory;
                XmlConfigurator.Configure(new FileInfo(log4net + "log4netconfig.xml"));

                ////使用BMSWeb的Log
                //if (log == null)
                //{
                //    log = new KLog();
                //}
                #endregion

                Process instance = RunningInstance();

                if (instance == null)
                {

                    KLog.Info("開始排程");
                    ////使用sqlConnection可以透過此下段程式取得ConnectionString
                    //String ConnString = ConfigurationManager.ConnectionStrings["BMSContext"].ConnectionString;

                    info = new UserInfo("admin");
                    //To do...
                    isSuccess = String.IsNullOrEmpty(MainProcess()) ? true : false;
                }
                else
                {
                    // Make old one foreground
                    ShowWindowAsync(instance.MainWindowHandle, 1);
                    SetForegroundWindow(instance.MainWindowHandle);
                    KLog.Info("Process Running!");
                    Environment.Exit(0);
                }
            }
            catch (Exception ex)
            {
                KLog.Error("系統錯誤", ex);
            }
            finally
            {
                KLog.Info("結束排程");

                //Dispose BMSDBEntities
                if (info != null && info.db != null)
                {
                    info.db.Dispose();
                }

                Environment.Exit(isSuccess ? 0 : 1);

                ////使用BMSWeb的Log
                //if (log != null)
                //{
                //    log.tracer.Dispose();
                //}
            }
        }

        /// <summary> RunningInstance,用於判斷是否已有Process執行中
        /// </summary>
        /// <returns>Process</returns>
        private static Process RunningInstance()
        {
            Process current = Process.GetCurrentProcess();
            Process[] processes = Process.GetProcessesByName(current.ProcessName);
            foreach (Process process in processes)
            {
                if (process.Id != current.Id)
                {
                    if (Assembly.GetExecutingAssembly().Location.Replace("/", "\\") == current.MainModule.FileName)
                    {
                        return process;
                    }
                }
            }
            return null;
        }

        private static string MainProcess()
        {
            bool isSuccess = false;            
            var info = new UserInfo("admin");

            try
            {
                #region 主程式商業邏輯撰寫
                var data = info.db.Database.SqlQuery<string>(@"
                    SELECT DISTINCT top 10 ID_NO FROM BMS_PRO_CONSUMER C WITH(NOLOCK) 
                    INNER JOIN BMS_ACC_ACCOUNT ACC WITH(NOLOCK) ON C.CONSUMER_ID = ACC.ACC_OWNER_ID
                    WHERE LEN(C.ID_NO) = 8 AND ACC.ACC_STATUS_ID = 'R'
                ").ToList();

                if (data != null)
                {
                    KLog.Info("Data Count =>" + data.Count);
                    int defaultMaxworkerThreads = 0;
                    int defaultmaxIOThreads = 0;
                    int waitHandleIndex = 0;
                    int maxThreadsAtOnce = Convert.ToInt32(System.Configuration.ConfigurationManager.AppSettings["MaxThread"]);;
                    _waitHandles = new List<ManualResetEvent>();
                    ThreadPool.SetMaxThreads(maxThreadsAtOnce, maxThreadsAtOnce);
                    ThreadPool.GetMaxThreads(out defaultMaxworkerThreads, out defaultmaxIOThreads);
                    dicResult = new Dictionary<string, string>();

                    KLog.Info("Start Time =>" + DateTime.Now.ToString());
                    foreach (var itemIdNo in data)
                    {
                        /*ManualResetEvent(MRE,一個MRE可控制多個子執行緒繼續或中止流程)
                         * https://www.twblogs.net/a/5da6ad84bd9eee310d9fbf88
                         * https://www.itread01.com/content/1511591884.html
                         * ManualResetEvent初始參數:true=>mre預設狀態為Set(),遇到WaitOne不會中止執行緒, false=>mre預設狀態為ReSet(),遇到WaitOne會中止執行緒
                         * 1.Set()=>關閉WaitOne訊號,(ThreadPool撈出的)執行緒可以繼續執行.WaitOne()之後的程式
                         * 2.Reset()=>開啟WaitOne訊號,(ThreadPool撈出的)執行緒遇到WaitOne()會被中止
                         * 3.WaitOne()=>mre設定執行緒中止點,中斷執行緒到重新呼叫Set()。執行緒才會繼續往下執行WaitOne()後面的程式                         
                         */
                        var resetEvent = new ManualResetEvent(false);//管理執行緒的狀態 true:預設訊號綠燈, false:預設訊號紅燈。                        
                        var controller = new MyJob(waitHandleIndex, itemIdNo);

                        ThreadPool.QueueUserWorkItem(new WaitCallback(OnThreadedDataRequest), controller);//每呼叫一次QueueUserWorkItem,就會從執行緒池喚醒一個正在休眠的執行緒並取出,ThreadPool預設上限為250個執行緒
                        waitHandleIndex++;
                        _waitHandles.Add(resetEvent);

                        if (_waitHandles.Count >= maxThreadsAtOnce)
                        {
                            /*宣告10個MRE後,要等所有執行緒都跑完OnThreadedDataRequest流程。之後清空MRE List,重新宣告10個MRE供執行緒綁定*/
                            System.Threading.WaitHandle.WaitAll(_waitHandles.ToArray<ManualResetEvent>());//等待所有執行緒都完成才往下一行,如果有被Reset()或WaitOne()就會一直卡在這行
                            _waitHandles = new List<ManualResetEvent>();
                            waitHandleIndex = 0;
                        }
                    }
                    KLog.Info("End Time =>" + DateTime.Now.ToString());
                }


                isSuccess = true; // 成功才設為true
                #endregion
            }
            catch (Exception e)
            {
                KLog.Error(e);
                return "發生異常錯誤:" + e.Message;
            }

            return "";
        }

        public static void OnThreadedDataRequest(object sender)
        {
            var controller = sender as MyJob;
            if (controller == null) return;
            controller.Execute();
            _waitHandles[controller.WaitHandleIndex].Set();

            /*waitHandles[controller.WaitHandleIndex].Set()。
             *1.從_waitHandles[controller.WaitHandleIndex]取得ManualResetEvent,並替執行緒綁定ManualResetEvent。
             *2.設定_waitHandles[controller.WaitHandleIndex]狀態為Set(),讓執行緒繼續執行.Set()後面的程式
             */
        }

        public class MyJob
        {

            public int WaitHandleIndex = 0;
            public string idNo;

            public MyJob(int waitHandleIndex, string paramIdNo)
            {
                WaitHandleIndex = waitHandleIndex;
                // Add input parameters to the constructor to store input variables in class level variables
                // for use by the Execute method below.
                idNo = paramIdNo;
            }

            public void Execute()
            {
                try
                {
                    DoWorker(idNo);
                }
                catch (Exception ex)
                {
                    KLog.Error("Execute()_" + ex.ToString());
                    throw ex;
                }
            }
        }

        private static void DoWorker(object idNo)
        {
            var info = new UserInfo("admin");
            string apiRes = "";
            string strIddNo = (string)idNo;
            CallWebApi(strIddNo, out apiRes);
            KLog.Info(apiRes);
            if (!string.IsNullOrEmpty(apiRes))
            {
                dicResult.Add(strIddNo, apiRes);
            }
            else
            {
                dicResult.Add(strIddNo, "NoReturnData");
            }
        }

        private static bool CallWebApi(string idNo, out string apiReturn)
        {
            bool res = false;
            string url = string.Format(@"https://data.gcis.nat.gov.tw/od/data/api/5F64D864-61CB-4D0D-8AD9-492047CC1EA6?$format=json&$filter=Business_Accounting_NO%20eq%20{0}&$skip=0&$top=50", idNo);

            apiReturn = string.Empty;
            try
            {
                var client = new RestClient(url);
                var request = new RestRequest(Method.GET);

                //request.AddHeader("accept", "application/json");
                //request.AddHeader("content-type", "application/json");
                //request.AddHeader("x-ibm-client-id", xIbmClientId);
                //if (!string.IsNullOrEmpty(token))
                //{
                //    request.AddHeader("token", token);
                //}
                //request.AddParameter("application/json", json, ParameterType.RequestBody);
                ServicePointManager.ServerCertificateValidationCallback = new RemoteCertificateValidationCallback(CertificateCheck);
                ServicePointManager.SecurityProtocol =
                                            SecurityProtocolType.Ssl3 | SecurityProtocolType.Tls |
                                            SecurityProtocolType.Tls11 | SecurityProtocolType.Tls12;

                IRestResponse response = client.Execute(request);
                if (response.StatusCode != HttpStatusCode.OK)
                {
                    return res;
                }

                apiReturn = response.Content;
            }
            catch (Exception e)
            {
                throw e;
            }
            return res;
        }

        private static bool CertificateCheck(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            if (sslPolicyErrors == SslPolicyErrors.None)
            {
                return true;
            }
            var request = sender as HttpWebRequest;
            if (request != null)
            {
                var result = request.RequestUri.Host.Contains("fetc");

                return result;
            }
            return false;
        }
    }
}

 

多ManualResetEvent物件範例:https://www.itread01.com/content/1511591884.html

迴圈會內會由ThreadPool撈出一個新的Thread(t),並new一個ManualResetEvent(MRE)。MRE[i]會被指派給t控制執行緒,

並由MRE的Set(). ReSet(). WaitOne()來控制執行緒執行或暫停