[IoT] Azure IoT整合應用二:建立串流分析工作,接收從IoT Hub的訊息並進行處理

在上一篇文章[Azure] Azure IoT整合應用一:建立Azure上的IoT Hub並發送訊息至IoT Hub中
說明了如何將訊息送進Azure IoT Hub,本篇文章會說明如何建立一個串流分析,處理進入IoT Hub的事件
並將訊息寫入至儲存體以及資料庫之中

由於Azure IoT Hub本身負責接收訊息,所以後續需要一個串流分析進行資訊的接手並作處理

建立的方式很簡單,不用寫程式就可以完成服務的建立了

1.首先先在Azure服務中,建立一個儲存體,類型請選擇[傳統],若是選擇[資源管理員],串流分析在進行輸出時會找不到這個儲存體

儲存體的建立,請不要選擇Premium-LRS,這個等級只有支援 Page Blob,而串流分析工作需要使用到 Block Blob

2.儲存體建立完成後,請建立一個[串流分析工作],並指定一個名稱

3.建立完成後,我們先回到IoT Hub中,在[傳訊]的設定中,最下方的[取用者群組],加上一個群組名稱,在這裡先加上一個[streamanalytics]的群組

IoT Hub中的每一個取用者群組可以允許五個來源服務進行資料的讀取,若是有超過五個以上,建議針對取用者群組進行設定較佳

4.接著,把IoT Hub的一些資訊先記下來,分別是[IoT Hub名稱],[共用存取原則名稱],[共用存取原則金鑰],要取得的方式,在IoT Hub的頁面中,點選[共用存取原則]就可以看到,在這裡,建議使用service的原則就可以

5.點開串流分析工作,並點選[輸入],在輸入的地方,我們要在串流分析裡,把IoT Hub當作是訊息的輸入項目,當一開始建好串流分析後,輸入與輸出基本上都是空白的

6.在輸入的設定畫面中,[來源]欄位請選擇[IoT 中心],別名的部份先輸入[FromIoT],至於其他欄位則是剛剛先從IoT Hub中記錄下來的資訊,輸入完之後就按下儲存

7.設定完輸入之後,我們到剛剛建立的儲存體中,要取得這個儲存體的一些資訊,以便作為輸出資訊用,進入儲存體之後,先到Blob中建立一個名為[logs]的容器,存取類型只要不是私人,Blob或是容器都可以

8.接著我們必須取得這個儲存體的一些資訊,分別是[儲存體帳戶名稱]以及[存取金鑰]

9.回到串流分析,剛剛建立的是從IoT Hub的輸入,接下來要建立一個輸出,將訊息輸出並儲存到儲存體中,點選串流分析工作的輸出方塊再點選[加入],在[接受]的欄位,請選擇[Blob 儲存體],並將給予一個輸出別名[ToStorage],而儲存體帳戶,金鑰以及容器的名稱,就在上一個步驟已經取得,依序填入就可以。路徑模式目前有支援到將存入的記錄用日期與小時的資料夾進行存放,若是要啟用的話,只要將關鍵字{date}與{time}加在路徑中就可以,所以路徑也可以輸入為"Test{date}/{time}"

10.完成建立後,剛剛已經建立好輸入與輸出,接著就要建立中間分析的查詢語法,透過這個查詢語法可以進行資訊的分析,並傳至[輸出]之中,建立查詢的方式,回到串流分析工作中,點選[查詢]的方塊,在這裡的語法是很接近SQL,但是還是有一些差異的語法,Into的後面,必須填上[輸出]的項目別名,FROM的後面則是加上[輸入]的項目別名,在這裡,若是不打算過濾任何資料,就不加上任何的WHERE語法,完成編輯就按下儲存

11.輸入、查詢與輸出都設定完了,接著就可以直接點選串流分析工作的[開始],這樣一來,只要有訊息傳入至IoT Hub中,串流分析工作就會將傳上來的訊息再輸出一份到儲存體之中了

可以使用上一篇所提及的模擬器,傳送訊息至IoT Hub進行測試

從儲存體的資料夾中,可以看到訊息真的被輸出並分資料夾存好了,若是下載下來,就會是分行的JSON字串格式的資料

12.接著我們要在串流分析工作中,加上過濾資料的動作,並將過濾後的資料寫入至資料庫之中,由於是寫入至資料庫中,所以點選串流分析工作的[輸出]加入一個新的輸出,[接受]的欄位請選擇[SQL 資料庫],其他的欄位則依照資料庫的資訊分別輸入,這裡就不再贅述,輸出別名一樣會用在查詢功能設定中,所以要記得別名是什麼

要設定[輸入]、[輸出]、[查詢]動作時,必須先將串流分析工作停止才行

13.接著回到串流分析的查詢中,在原本的查詢內容後面,再加上相同的內容,差別只是在於Into的後面,變成了資料庫輸出的別名,並在後面加上了WHERE條件

寫入資料庫的SELECT請不要用*(星號),從IoT Hub來的資訊會除了原本上傳的JSON內容外,還多了四個欄位,分別是[EventEnqueuedUtcTime]、[PartitionId]、[EventProcessedUtcTime]與[IoT Hub的名稱]
用不到的話請不要加入這些欄位的名稱,也不要用*(星號)

上面的語法,除了將從IoT Hub傳來的訊息寫入到資料庫中之外,還過濾了Type='Alert'的資料,並多寫入了IoT Hub傳入的三個欄位,編輯完之後按下儲存,並點選[開始],這樣串流分析工作寫入資料庫的設定就完成了

從模擬器送出Type='Alert'的內容

從資料表中也可以看到輸入的結果了

串流分析工作所能處理的資料來源[輸入]與資料寫入[輸出]種類非常的多,對於大量使用雲端運算的企業來說非常的實用,在大量資料的處理與分析效能也很好,有興趣的人可以試試看

本篇文章所用到的模擬器程式已放在GitHub上,網址如下
https://github.com/madukapai/maduka-Azure-IoT

串流分析工作的SQL語法可以參考這個網頁
Query examples for common Stream Analytics usage patterns

其他的參考資料
Get started using Azure Stream Analytics: Real-time fraud detection
Stream Analytics Query Language Reference