[Robotics Studio] CCR Overview -- Day 17

[Robotics Studio] CCR Overview -- Day 17

又到了講解概念時間 ...

這次為大家說明的是 CCR (Concurrency and Coordination Runtime).

CCR 主要架構在 .NET CLR 上面, 而開發出 CCR 的目的, 就是為了解決當程式被切割成一個一個的元件時 (就像 DSS) , 元件之間的訊息傳遞, 同步性, 資料一致性, 以及協調等等問題.

先說明, 下面提到的範例程式都可以執行, 但是不夠嚴謹(沒做變數檢查, 沒有 Dispose), 因為是教學 (MSDN 當中很多範例也是這樣).
等到你了解系統以後, 實際寫 code 的時候還是多多注意小細節 (不過我自己也常常很粗心...).

而 CCR 主要分為三大部分:

1. Port, PortSet

CCR 透過 Port 來傳遞訊息 (Message Body) , 而 PortSet 就是可以傳遞多種不同訊息的 Port 擴充 class. 訊息在 Port 當中是以 Queue (先進先出) 的方式來儲存. 一個訊息丟給某個 Port 之後, 一般都是會透過 Receiver (Arbiter.Receive) 來接收, 然後就啟動該 Receiver 的 Handler.

如果你要產生一個存放 int 的 Port , 就可以寫

Port<int> myPortInt = new Port<int>();

而對它傳遞一個訊息, 你可以寫

myPortInt.Post(1);

而取出訊息, 則可以用 Test , 像是

int msg;
bool HasMsg = myPortInt.Test(out msg);

不過在多工協調的時候, 一般都是透過 Arbiter.Receive 來處理取得訊息時該做甚麼事情.

而 PortSet 就是一種可以接受多種訊息的 Port, 比方說, 要產生一個 Port 可以接收兩種訊息, 分別為 int, string, 就可以這樣寫:

PortSet<int, string> myport = new PortSet<int, string>();

我們之前的 DSS 程式, 在 DSS Service MainPort 的部分就是 PortSet.

Port, PortSet 是 CCR 訊息傳遞的基本, 所以如果不了解就很難繼續下去 (但是它其實是很簡單的訊息儲存讀取器而已).

 

2. Dispatcher, DispatcherQueue, Task

就好比 Thread 執行程式碼, 在 CCR 當中, 程式的運行是以 ITask 作為執行的單位, 而 DispatcherQueue 就是以 Queue 的方式來存放許多的 Tasks (以下我提到 Task, 就是代表該 class 有支援/實作 ITask 的意思) , 最後, Dispatcher 就像是 Thread, 用來執行運作 DispatcherQueue (當中的 Tasks). 不過在極少數的狀況下, DispatcherQueue 也可以被 .NET Thread Pool 來運作.

而 Task 最重要的就是 Handler, 就是 Programmer 寫的 delegate function.

至於這許多的 Tasks 從何而來, 如何產生, Handler (你的程式碼, delegate function) 何時要被執行 , 就是要靠接下來介紹的 Arbiter class.

3. Arbiter class

這是 CCR 寫好的一組函式庫, 負責用來撰寫處理訊息所需要用到的協同處理函式. 通常使用者 (programmer) 就是透過撰寫 delegate function, 然後指定當某種情境發生時 (比方說, Receive Task 就是某個 Port 收到訊息時), CCR 就會執行該 delegate function (透過 ITask).

以下我開始介紹 Arbiter 的所有 Member (都是 static function, 所以都是直接使用), 這可是 CCR 的重頭戲 (不過我只介紹基本使用方法, 進階的使用方法就太多了):

Arbiter.Recive

產生一個 Receive Task , 就是當某一個 port 收到訊息時, 就執行 Handler (某段程式碼).

比方說, 以下的程式產生一個 Receive Task, 就是當一個 Port<int> 收到 int 訊息時就把它從 Console 輸出. (但是該 Task 還不會被執行, 因為我們只是產生了 Recive Task)

var myport = new Port<int>();
var mytask = Arbiter.Receive(true, myport, i => Console.WriteLine(i));

其中, 第一個參數如果是 true, 表示這個 Task 一直都要存在, 不會消失, 如果是 false, 則表示僅僅執行一次, 這個 Task 就消失.

Arbiter.Activate

使用某個 DispatcherQueue 來執行一系列的 Tasks.

比方說我們要產生一個 DispatcherQueue, 然後利用該 DispatcherQueue 來執行一個 Receive Task, 就可以寫成下面這樣的 code :

var taskQueue = new DispatcherQueue("myqueue", new Dispatcher());
var myport = new Port<int>();
Arbiter.Activate(taskQueue, Arbiter.Receive(true, myport, msg => Console.WriteLine(msg)));

如果接下來有程式執行 myport.Post(3) , 就會啟動執行 Receiver Task, 也就是在 Console 印出 3 .

Arbiter.Choice

有點像是 switch case , 是屬於 Task 的分支流程處理用的, 它會產生一個 Choice Task , 而只會處理眾多 Handlers 的其中之一 (或眾多 Tasks 的其中之一個 Handler).

比方說有個 PortSet 叫做 myport,  可以接受 int, string 這兩種訊息, 於是你可以用

Arbiter.Choice(myport, intmsg => Console.WriteLine(intmsg), stringmsg => Console.WriteLine(stringmsg));

來表示當 myport 收到 int 時, 或是收到 string 時要作的事 (但只有二選一), 你也可以寫成這樣:

Arbiter.Choice(
  Arbiter.Receive<int>(false, myport, intmsg => Cosnole.WriteLine(intmsg)),
  Arbiter.Receive<string>(false, myport, strmsg => Console.WriteLine(strmsg)));

因為 Arbiter.Choice 一旦處理完分支 Task 就把所有 Task 銷毀, 所以你不能傳永久存在的 Receive Task 給它歐.

Arbiter.FromHandler , Arbiter.FromIteratorHandler

將 delegate function 轉成 Task, 這樣可以提供排程 (DispatcherQueue) 使用.

很簡單, 想把某段程式碼變成 Task, 就這樣作:

var task = Arbiter.FromHandler( () => { Console.WriteLine("Hello World"); });

你也可以寫一個函式是回傳 IEnumerator<ITask> , 然後就是利用 FromIteratorHandler 來把該函式轉成 Task.

Arbiter.Interleave

以前, 你會用 lock, 或 ReaderWriterLock 來處理 multi-thread 的程式處理資料的問題, Arbiter.Interleave 就是用來解決這個問題.

你現在知道 ServiceBehavior.Concurrent, ServiceBehavior.Exclusive, ServiceBehavior.TearDown 是如何實現了, 基本上之前我們的 Start() 都會呼叫 base.Start(), base.Start() 當中就是利用 Arbiter.Interleave (產生一個 MainInterleave) 來把所有的 ServiceHandler 集合起來執行.

所以這個函式會產生一個 Task 集合 , 你要傳給它三種 Task 集合,
TearDownReceiverGroup 是屬於結束的時候呼叫的, Arbiter.Interleave 保證它不會跟其他任何 Task 同時執行, 但是它必須是一次性的 Receiver (one time only).
ExclusiveReceiverGroup 是屬於寫入資料時呼叫, Arbiter.Interleave 保證它不會跟其他任何 Task 同時執行.
ConcurrentReceiverGroup 是屬於讀取資料時呼叫, Arbiter.Interleave 讓這些可以同時被執行但不會跟 ExclusiveReceiverGroup, TearDownReceiverGroup 同時執行.
MSDN 當中的範例就像是這樣:

// activate an Interleave Arbiter to coordinate how the handlers of the service
// execute in relation to each other and to their own parallel activations 
Arbiter.Activate(_taskQueue,
    Arbiter.Interleave(
    new TeardownReceiverGroup(
    // one time, atomic teardown
        Arbiter.Receive<Stop>(false, _mainPort, StopHandler)
    ),
    new ExclusiveReceiverGroup(
    // Persisted Update handler, only runs if no other handler running
        Arbiter.Receive<UpdateState>(true, _mainPort, UpdateHandler)
    ),
    new ConcurrentReceiverGroup(
    // Persisted Get handler, runs in parallel with all other activations of itself
    // but never runs in parallel with Update or Stop
        Arbiter.Receive<GetState>(true, _mainPort, GetStateHandler)
    ))
);

 

而 CCR 實現這個 Arbiter.Interleave 不是靠傳統的 lock, 或是 ReaderWriterLock, 而是靠內部的 TaskQueue 來實作的, 它也不需要知道哪些資料是你所要保護的, 它只是負責避免哪些程式不可同時執行, 以及允許哪些程式可以同時執行.

Arbiter.JoinedReceive

假如你想要等候兩個 Port 都分別收到訊息時才要作處理 (執行 Handler) 的話, 這個就是你要的.

比方說下面的 code:

Arbiter.JoinedReceive<int, string>(false, myport, myport2, (intmsg,strmsg) => Console.WriteLine(intmsg+","+strmsg))

就是表示產生一個 JoinedReceive Task, 當 myport 收到 int 訊息, myport2 收到 string 訊息, (不管先後順序, 會一直等到兩個都收到訊息), 就在 Console 作輸出.

Arbiter.MultipleItemReceive

跟 Arbiter.JoinedReceive 不太一樣的地方在於, 它是看訊息的數量, 當一定數量的訊息送到一個或多個 Port 的時候, 就會啟動 Handler.

比方說下面這個 code 就是當累積五個 int 訊息送到 portInt 的時候, 就會啟動 Handler, 把這五個 int 輸出到螢幕上.

Arbiter.MultipleItemReceive(true, portInt, 5,
  intArray => Console.WriteLine(string.Join(",", new List<int>(intArray).ConvertAll<string>(i => i.ToString()).ToArray())));

 

一旦你了解了 Arbiter 的函式, 還有 DSSP 的類型, 相信你再去看 documentation 當中的 MsrsUserGuideExpress.chm 就會獲益良多囉.


題外話:
前天我才去天瓏書局翻了 Professional Microsoft Robotics Developer Studio (Wrox Programmer to Programmer) ,
發現裡面程式主要是 RDS 1.5 的, 難怪跟 RDS 2008 不太一樣...
雖然基本功都是相通的,  但是其實還是有點小變化, 所以還是讀 documentation 好了...