The Framework Designing (5) – Flow Engine Part 1

Flow Engine,也就是我們常聽到的【流程引擎】,主要用途用於以特定順序執行一系列的工作,就概念上而言,Flow Engine非常簡單,只是依據特定順序來執行一系列工作而已

The Framework Designing (5) – Flow Engine Part 1

 

/黃忠成

 

 

緣起

 

  算算自小公主出生以來,我已有一段蠻長時間未寫任何文章了,多數時間都耗在哄小公主睡覺、上課、顧問工作上,實在也沒有太多時間可以寫文章。隨著2011年的結束,一整年度的工作也告一段落,

小公主隨著年紀增長,也比較好帶了些,我終於有點自己的時間來將過去一年所研究的技術與實作品與各位讀者分享。

 

 

Flow Engine

 

  Flow Engine,也就是我們常聽到的【流程引擎】,主要用途用於以特定順序執行一系列的工作,就概念上而言,Flow Engine非常簡單,只是依據特定順序來執行一系列工作而已,如圖1:

 

圖1

說實話,這跟一般迴圈所做的事差不多,都是依據特定順序來執行一系列工作,我們可以用C#輕易地來實現這個概念。

 

程式1

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;


namespace ConsoleApplication11

{

    class Program

    {

        static void ExecuteFlow(paramsAction[] steps)

        {

            foreach(varstep insteps)

                step();

        }


        static void Main(string[] args)

        {

            ExecuteFlow(

                () =>

                {

                    Console.WriteLine("1");

                },

                () =>

                {

                    Console.WriteLine("2");

                },

                () =>

                {

                    Console.WriteLine("3");

                });

            Console.ReadLine();

        }

    }

}

那如果這麼簡單,為何還要特地寫文章來介紹呢?是的,Flow Engine的概念簡單的很,但要完善的實現這個概念卻不容易,因為在整個Flow的執行過程中可能會發生以下幾種情況。

 

1、Step執行產生例外

2、Step與Step間需交換參數

3、Flow必須回傳結果值

4、Step可能會有跳過下一個Step的需求

5、Step本身可能需要執行多個Sub Steps

6、Steps的執行可能是非同步的

7、Step可能需要跳到特定的Step執行,期間會跳躍過多個Steps

8、Steps可能需要動態組成

9、Steps必須可重用

將這些可能發生的情況包含進來的話,程式1絕不會是現在這麼簡潔,Flow Engine的概念很直覺,但細節卻很繁複。

 

開始– Executing State

 

  就Flow Engine的概念中,至少擁有兩個角色,一是Flow,二是Flow Step,每個Flow代表一個流程,而流程中的每個步驟則以Step方式呈現,根據上節列出的可能發生情況中的

第1、4、6點可以估算出Flow中的每個Step必須擁有以下狀態,分別代表執行成功、執行中、失敗等狀態資訊。

//Flow,Flow Step執行狀態

public enum ExecuteState

{

        Idle,  //停滯

        Running, //執行中

        Complete, //執行完成

        Fail, //錯誤

        Cancel, //取消

        Jump //跳過此Step

}

整個架構圖將會變成如圖2

圖2

每個Step都擁有一個執行狀態,Flow本身也擁有一個計算型的執行狀態,其值是統計所有Step執行狀態而來。

依據第2條可能發生的情況,Flow Engine需要有一個通道,讓各個Step間交換參數用,這設計起來很簡單,就是一個Context概念,當Flow執行起來時,會進入一個Context,

而Context負責提供一個容器供Flow及各個Step間交換資料。

public interface IFlowContext

{

        Dictionary Parameters { get; }

        T GetParameter(stringkey);

        voidSetParameter(stringkey, T value);

}

整個架構會變成圖3

圖3

有了大概的樣子後,我們可以開始定義Flow Engine中的Step Interface。

 

//Flow Step基本介面
    public interface IFlowStep
    {
        //當Step執行完畢後觸發
        event EventHandler ExecuteComplete;
        //當Step執行期間產生未補捉例外時,放置於此處
        Exception FailException { get; }
        //所有Flow Step共用的Context
        IFlowContext Context { get; set; }
        //Step Name, 必要欄位
        string StepName { get; }
        //Step執行狀態
        ExecuteState State { get; }
        //下一個要執行的Step Name,如未指定則依序執行
        string NextFlow { get; set; }
        //執行
        void Execute();
        //確認完成
        void Complete();
        //要求取消
        void Cancel();
        //跳過此Step
        void Jump();
        //Step執行失敗
        void Fail(Exception ex);
    }

  每個Step都擁有一個Context屬性,其值是由Flow所指定,當Flow開始執行時,會準備一個Context物件,並於執行Step前指定至Step的Context屬性,這樣Step於執行期間就有一個容器

可互相交換資料。

依據第6條,Step的執行可能是非同步的,所以每個Flow Step除了擁有執行Step用的Execute函式外(Flow會依序呼叫Step的Execute來執行Step),還必須擁有Complete、Cancel、Fail等函式,

當Step執行成功時,必須負責呼叫Complete函式,此時該Step狀態會變成Complete,Flow以此來判斷某個Step是否正確執行成功,再決定是否執行下一個Step。

依據第7條,Step必須擁有跳到特定Step的能力,也就是說當某種條件成立時,要求Flow直接跳到特定的Step執行,此功能透過NextFlow完成,當Step需要跳到某個特定Step時,

需指定NextFlow屬性,該值為指定跳躍的StepName,然後呼叫Jump,此時執行狀態會是Jump,當Flow看到此狀態時,會依據NextFlow值來跳躍至特定的Step執行。

  當Step執行失敗時,必須負責呼叫Fail,此函式應該把執行狀態改為Fail,告知Flow此Step執行失敗,由Flow決定是要終止所有Flow或是忽略。

  當Step執行成功或是失敗時,都會觸發ExecuteComplete事件,此事件可供使用者或是Flow物件掛載,當執行期間因產生Exception而導致失敗時,Exception都會放置於FailException內,

由Flow統一回報。

下列是FlowContext的實作,並沒有很複雜。

public class FlowContext : IFlowContext
    {
        private object _lock = new object();
        private Dictionary _parameters = new Dictionary();

        public Dictionary Parameters
        {
            get
            {
                return _parameters;
            }
        }

        public T GetParameter(string key)
        {
            lock (_lock)
            {
                if (!Parameters.ContainsKey(key))
                    return default(T);
                return (T)Parameters[key];
            }
        }

        public void SetParameter(string key, T value)
        {
            lock (_lock)
            {
                if (!Parameters.ContainsKey(key))
                    Parameters.Add(key, value);
                else
                    Parameters[key] = value;
            }
        }
    }

接著定義Flow的介面。

//Flow 基礎介面
    public interface IFlowService
    {
        //when step is complete or cancel, the event will be fire.
        event EventHandler FlowExecuteComplete;

        //所有Flow Step共用的Context
        IFlowContext Context { get; }
        //要執行的Steps
        IFlowStep[] Steps { get; }
        //執行狀態
        ExecuteState State { get; }
        //執行所有Steps
        void Execute();
        //執行所有Steps - 非同步
        void ExecuteAsync();
        //等待完成
        void WaitingForComplete();
        //取消執行
        void Cancel();
    }

當所有Step執行或因失敗而中止,FlowExecuteComplete都會被呼叫,此時可透過其參數來決定Flow是否正常執行還是異常中止。

 public class FlowExecuteCompleteArgs : EventArgs
    {
        private ExecuteState _state = ExecuteState.Idle;
        private Exception[] _failExceptions = null;

        public ExecuteState State
        {
            get
            {
                return _state;
            }
        }

        public Exception[] Exceptions
        {
            get
            {
                return _failExceptions;
            }
        }

        public FlowExecuteCompleteArgs(ExecuteState state)
        {
            _state = state;
        }

        public FlowExecuteCompleteArgs(ExecuteState state,Exception[] exceptions)
        {
            _state = state;
            _failExceptions = exceptions;
        }
    }

當Flow因異常而中止時,此參數的Exceptions將包含所有產生的Exception,那他為何是個陣列呢?這是因為第5條需求,Step本身可能包含多個Sub Step,且其可能是以非同步執行,

同時執行多個Sub Step,也就是說這種Step執行期間,可能會產生出多個Exception。

 

Implement FlowStepBase

 

  定義完介面後便可開始實作,介面所顯露的只是一個架構的規格,我們雖然可以從介面本身窺探出架構,但通常只是一個概觀,所以多數的Framework除提供介面之外,

還會實作一個參考的實作體,這個實作體通常以抽象類別方式存在,也就是說Framework以抽象類別告知開發者,這些介面黏合的細節,也以這些抽象類別簡化開發者延伸的難度。

public abstract class FlowStepBase : IFlowStep
    {
        private IFlowContext _context = null;
        private ExecuteState _state = ExecuteState.Idle;
        private string _nextFlow = null;
        private EventHandler _executeCompleteHandler;
        private Exception _failException = null;

        public event EventHandler ExecuteComplete
        {
            add
            {
                _executeCompleteHandler = (EventHandler)Delegate.Combine(_executeCompleteHandler, value);
            }
            remove
            {
                _executeCompleteHandler = (EventHandler)Delegate.Remove(_executeCompleteHandler, value);
            }
        }

        public Exception FailException
        {
            get
            {
                return _failException;
            }
        }

        public IFlowContext Context
        {
            get
            {
                return _context;
            }
            set
            {
                _context = value;
            }
        }

        public string NextFlow
        {
            get
            {
                return _nextFlow;
            }
            set
            {
                _nextFlow = value;
            }
        }

        public abstract string StepName { get; }


        public ExecuteState State
        {
            get
            {
                return _state;
            }
            set
            {
                _state = value;
            }
        }

        protected abstract void DoExecute();

        public void Execute()
        {
            _state = ExecuteState.Running;
            try
            {
                DoExecute();
            }
            catch (Exception ex)
            {
                Fail(ex);
            }
        }

        protected virtual void DoComplete()
        {
            if (_executeCompleteHandler != null)
                _executeCompleteHandler(this, EventArgs.Empty);
        }

        public void Cancel()
        {
            _state = ExecuteState.Cancel;
            DoComplete();
        }

        public void Complete()
        {
            if(_state == ExecuteState.Running)
                _state = ExecuteState.Complete;
            DoComplete();
        }

        public void Jump()
        {
            _state = ExecuteState.Jump;
            DoComplete();
        }

        public void Fail(Exception ex)
        {
            _failException = ex;
            _state = ExecuteState.Fail;
            DoComplete();
        }
    }

實作本身並沒有太複雜。

 

Implement Flow Service

 

  接著實作Flow本身,也就是IFlowService。

public abstract class FlowService : IFlowService
    {
        private FlowContext _context = new FlowContext();
        private IFlowStep[] _steps;
        private EventHandler _flowExecuteCompleteHandler;

        public event EventHandler FlowExecuteComplete
        {
            add
            {
                _flowExecuteCompleteHandler = (EventHandler)Delegate.Combine(_flowExecuteCompleteHandler, value);
            }
            remove
            {
                _flowExecuteCompleteHandler = (EventHandler)Delegate.Remove(_flowExecuteCompleteHandler, value);
            }
        }

        public IFlowStep[] Steps
        {
            get
            {
                if (_steps == null)
                    _steps = CreateSteps();
                return _steps;
            }
        }

        public IFlowContext Context
        {
            get
            {
                return _context;
            }
        }

        public ExecuteState State
        {
            get
            {
                foreach (var step in Steps)
                {
                    if (step.State == ExecuteState.Cancel)
                        return ExecuteState.Cancel;
                    else if (step.State == ExecuteState.Fail)
                        return ExecuteState.Fail;
                    else if (step.State == ExecuteState.Running)
                        return ExecuteState.Running;
                    else if (step.State == ExecuteState.Idle)
                        return ExecuteState.Idle;
                }
                return ExecuteState.Complete;
            }
        }

        protected abstract IFlowStep[] CreateSteps();

        protected virtual void ExecuteComplate()
        {
        }

        public void ExecuteAsync()
        {
            Thread th = new Thread((state) =>
            {
                AutoResetEvent resetEvent = new AutoResetEvent(false);
                try
                {
#if SILVERLIGHT
                System.Windows.Threading.DispatcherSynchronizationContext sc =
                        (System.Windows.Threading.DispatcherSynchronizationContext)state;
#endif
                    string _jumpStep = null;
                    for (int i = 0; i < Steps.Length; i++)
                    {
                        Steps[i].Context = _context;
                        Steps[i].ExecuteComplete += (s1, arg1) =>
                            {
                                resetEvent.Set();
                            };
                        if (_jumpStep != null && _jumpStep != Steps[i].StepName)
                        {
                            Steps[i].Jump();
                            continue;
                        }
#if SILVERLIGHT
                    sc.Post((arg1) => {
                              Steps[i].Execute();
                           },null);
#else
                        Steps[i].Execute();
#endif
                        resetEvent.WaitOne();
                        if (Steps[i].State == ExecuteState.Fail || Steps[i].State == ExecuteState.Cancel)
                            break;
                        _jumpStep = Steps[i].NextFlow;
                    }
                    if (_flowExecuteCompleteHandler != null)
                    {
                        if (State == ExecuteState.Fail)
                            _flowExecuteCompleteHandler(this, new FlowExecuteCompleteArgs(State, _steps.Where(a => a.FailException != null).Select(a => a.FailException).ToArray()));
                        else
                            _flowExecuteCompleteHandler(this, new FlowExecuteCompleteArgs(State));
                    }
                    ExecuteComplate();
                }
                finally
                {
                    resetEvent.Dispose();                    
                }
            });
            th.IsBackground = true;
#if SILVERLIGHT
            th.Start(System.Threading.SynchronizationContext.Current);
#else
            th.Start(null);
#endif
        }

        public void Execute()
        {
            string _jumpStep = null;
            for (int i = 0; i < Steps.Length; i++)
            {
                Steps[i].Context = _context;
                if (_jumpStep != null && _jumpStep != Steps[i].StepName)
                {
                    Steps[i].Jump();
                    continue;
                }
                Steps[i].Execute();
                if (Steps[i].State == ExecuteState.Fail || Steps[i].State == ExecuteState.Cancel)
                    break;
                _jumpStep = Steps[i].NextFlow;
            }
            if (_flowExecuteCompleteHandler != null)
            {
                if (State == ExecuteState.Fail)
                    _flowExecuteCompleteHandler(this, new FlowExecuteCompleteArgs(State, _steps.Where(a => a.FailException != null).Select(a => a.FailException).ToArray()));
                else
                    _flowExecuteCompleteHandler(this, new FlowExecuteCompleteArgs(State));
            }
            ExecuteComplate();
        }

        public void WaitingForComplete()
        {
            while (true)
            {
                if (State == ExecuteState.Complete ||
                   State == ExecuteState.Fail ||
                   State == ExecuteState.Cancel)
                    break;
                System.Threading.Thread.Sleep(10);
            }
        }

        public void Cancel()
        {
            foreach (var step in Steps)
            {
                try
                {
                    step.Cancel();
                }
                catch (Exception)
                {
                }
            }
        }
    }

這裡就複雜多了,先看ExecuteState部分,其值是統計所有Step執行狀態而來。

public ExecuteState State

        {

            get

            {

                foreach(varstep inSteps)

                {

                    if(step.State == ExecuteState.Cancel)

                        returnExecuteState.Cancel;

                    elseif(step.State == ExecuteState.Fail)

                        returnExecuteState.Fail;

                    elseif(step.State == ExecuteState.Running)

                        returnExecuteState.Running;

                    elseif(step.State == ExecuteState.Idle)

                        returnExecuteState.Idle;

                }

                returnExecuteState.Complete;

            }

        }

這裡應該不難懂。

FlowService較難的部分在於ExecuteAsync函式,其將以非同步方式執行所有Steps,這裡使用了AutoResetEvent來維持Step的順序性。

另外,FlowService設計時也考慮到了Silverlight的情況,Flow Engine在Silverlight/Windows Phone 7下有一個特別的應用,後面我們再討論這塊

 

Implement Static Flow

 

  FlowService與FlowStepBase都只是抽象類別,離真正可用的類別還有一小段路,設計Framework時,應該提供一個簡易的實作給開發者,除了讓開發者馬上可用外,還可協助開發者更了解Framework本體。

using System;
using System.Net;
using System.Collections.Generic;
using System.Linq;

namespace Artemis.FlowEngine
{
    /// 
    /// dynamic flow step is a easy-use flow engine facade,
    /// allow developer use actions to execute flow.
    /// 
    public class StaticFlowStep : FlowStepBase
    {
        private string _stepName;
        private Action _executeAction;

        public override string StepName
        {
            get
            {
                return _stepName;
            }
        }

        protected override void DoExecute()
        {
            _executeAction(this);
        }

        public StaticFlowStep(string stepName, Action executeAction)
        {
            _stepName = stepName;
            _executeAction = executeAction;
        }
    }

    public class StaticFlowService : FlowService
    {
        private List _steps = new List();

        protected override IFlowStep[] CreateSteps()
        {
            return _steps.ToArray();
        }

        public void AddStep(IFlowStep step)
        {
            _steps.Add(step);
        }

        public StaticFlowService(params IFlowStep[] steps)
        {
            _steps = steps.ToList();
        }
    }

    
}

Static Flow是一組簡單的Flow Engine實作,其用法如下所示。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Artemis.FlowEngine;

namespace TestApp
{
    class Program
    {
        static void TestStaticFlow()
        {
            StaticFlowService service = new StaticFlowService(
                new StaticFlowStep("1",(sender)=>
                    {
                        int baseValue = sender.Context.GetParameter("baseValue");
                        sender.Context.SetParameter("baseValue",baseValue + 10);
                        sender.Complete();
                    }),
                new StaticFlowStep("2", (sender) =>
                {
                    int baseValue = sender.Context.GetParameter("baseValue");
                    sender.Context.SetParameter("baseValue", baseValue + 10);
                    sender.Complete();
                }));
            service.Context.SetParameter("baseValue", 10);
            service.Execute();
            Console.WriteLine(service.Context.GetParameter("baseValue"));
        }

        static void TestStaticFlowAsAsync()
        {
            StaticFlowService service = new StaticFlowService(
                new StaticFlowStep("1", (sender) =>
                {
                    int baseValue = sender.Context.GetParameter("baseValue");
                    sender.Context.SetParameter("baseValue", baseValue + 10);
                    sender.Complete();
                }),
                new StaticFlowStep("2", (sender) =>
                {
                    int baseValue = sender.Context.GetParameter("baseValue");
                    sender.Context.SetParameter("baseValue", baseValue + 10);
                    sender.Complete();
                }));
            service.Context.SetParameter("baseValue", 10);
            service.ExecuteAsync();
            service.WaitingForComplete();
            Console.WriteLine(service.Context.GetParameter("baseValue"));
        }

        static void Main(string[] args)
        {
            TestStaticFlow();
            TestStaticFlowAsAsync();
            Console.ReadLine();
        }
    }
}

 

 

Silverlight and Windows Phone 7

 

Flow Engine在Silverlight與Windows Phone 7這種以非同步處理的架構中有特別的用法,預設情況下,當使用WebClient或是呼叫WCF Service時,設計者通常因為非同步的關係,

必須以事件掛載的方式來處理呼叫後的結果,這在單一呼叫時不會造成困擾,但如果是循序型呼叫時,就必須面對事件串鏈後的複雜性,詳情可參考我的另一篇關於Async CTP的文章

http://www.dotblogs.com.tw/code6421/archive/2010/10/31/18696.aspx

 

但在Flow Engine的協助下,我們可以在沒有Async CTP的情況下簡化這種循序型呼叫。

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Animation;
using System.Windows.Shapes;
using System.Windows.Media.Imaging;
using System.IO;
using Artemis.FlowEngine;

namespace SilverlightApplication1
{
    public partial class MainPage : UserControl
    {
        public MainPage()
        {
            InitializeComponent();
        }

        private void LoadPic(IFlowStep step)
        {
            WebClient client2 = new WebClient();
            client2.OpenReadCompleted += (s1, arg1) =>
            {
                if (arg1.Error != null)
                {
                    step.Fail(arg1.Error);
                    return;
                }
                Dispatcher.BeginInvoke(new Action((sender) =>
                {
                    BitmapImage bmp = new BitmapImage();
                    bmp.SetSource(arg1.Result);
                    Image img = new Image();
                    img.Source = bmp;
                    stackPanel1.Children.Add(img);
                    ((IFlowStep)sender).Complete();
                }), step);
            };
            client2.OpenReadAsync(new Uri("/" + step.StepName, UriKind.Relative));
        }

        private void GetWithFlowEngine()
        {
            WebClient client = new WebClient();
            client.DownloadStringCompleted += (s, arg) =>
                {
                    if (arg.Error != null)
                    {
                        ((IFlowStep)s).Fail(arg.Error);
                        return;
                    }
                    List imageList = new List();
                    using (StringReader sr = new StringReader(arg.Result))
                    {
                        while(sr.Peek() != -1)
                            imageList.Add(sr.ReadLine());
                    }
                    StaticFlowService service = new StaticFlowService();
                    foreach (var item in imageList)
                        service.AddStep(new StaticFlowStep(item,new Action(LoadPic)));
                    service.ExecuteAsync();
                };
            client.DownloadStringAsync(new Uri("/TextFile1.txt", UriKind.Relative));
        }

        private void button1_Click(object sender, RoutedEventArgs e)
        {
            GetWithFlowEngine();
        }

       
    }
}

 

截至目前為止,我們達到的需求如下:

 

1、Step執行產生例外  -- OK

2、Step與Step間需交換參數—OK

3、Flow必須回傳結果值 – OK

4、Step可能會有跳過下一個Step的需求 – OK

5、Step本身可能需要執行多個Sub Steps

6、Steps的執行可能是非同步的—OK

7、Step可能需要跳到特定的Step執行,期間會跳躍過多個Steps – OK

8、Steps可能需要動態組成

9、Steps必須可重用

之後的文章會一一實現這張表中的需求,而且,是在不改變所有介面定義的情況下延展,下次再見了。

 

範例下載

http://www.code6421.com/BlogPics/Artemis_FlowEngine.zip