MQTT 學習心得

發現用部落格來做紀錄蠻方便的

最近需要處理同時間收取大量資料,由於不需要即時回傳處理結果,再加上之前有用過windows message queue service, RabbitMQ 的經驗,所以直覺認為 message queue 是很好的方向,

在企業內部,不管有沒有跨平台的需求,個人很推 RabbitMQ 這個 Broker,但如果不只是光要在企業內部使用,應該要考慮類似 Azure 上的EventHub 這類的Message Queue 

這次因為有同事在Survey IoT,希望使用MQTT的協定,因此搭個順風車,試用了一下MQTT 相關的Broker 跟操作

 

關於MQTT

基本概念,可以看一下連結 http://www.cnblogs.com/caca/p/mqtt.html 的文章 (有點技術又不會太技術)

或者參考 HiveMQ 的教學 http://www.hivemq.com/mqtt-essentials/ ,這間公司的MQTT Broker (HiveMQ)是要錢的,所以文件寫得比較好也是應該的

 

要學習MQTT 首先要找MQTT 的Broker 

RabbitMQ 雖然有MQTT的Plugin, 不過 RabbitMQ 主打的是滿足 AMQP的協定

而且從官方文件看來,RabbitMQ 上的MQTT broker plugin 效能似乎不是那麼令人滿意

http://www.scalagent.com/IMG/pdf/Benchmark_MQTT_servers-v1-1.pdf

所以也順著同事的建議,也找了mosquitto 來當MQTT的Broker (不過還是試著在RabbitMQ 上,enable mqtt 的plugin,小小的測試一下,我還是覺得RabbitMQ 很好用)

 

現在軟體都要能夠安裝在多種平台,mosquitto 也不例外

mosquitto 如果要安裝在Linux 上,Google 一下,可找到不同distribution Linux 的安裝介紹

而自己想測試的環境是raspberry pi 跟windows ,所以就專門找raspberry pi & windows 平台

如果要裝在raspberry pi 上,可以參考 http://www.switchdoc.com/2016/02/tutorial-installing-and-testing-mosquitto-mqtt-on-raspberry-pi/  (google 文章上排比較上面的 repo.list 名稱有錯誤的情況,應該是舊的吧)

裝windows 也不難,但有稍稍撞了一下牆,從 http://mosquitto.org/download/ 官方下載win32安裝檔

安裝執行完後,記得看一下 c:\program files(x86)\mosquitto 下的readme_windows.txt  (其實在安裝的第一個畫面有,但Readme寫得比較清楚)

還要去 http://slproweb.com/products/Win32OpenSSL.html  (readme 上有寫要 32位元版,我下載成64位元版,就錯了)

跟 ftp://sourceware.org/pub/pthreads-win32 下載某個檔案 

主要是把三個dll , copy 到  c:\program files(x86)\mosquitto  目錄下, 然後就可以啟動mosquitto.exe 了 (如果要變成Service 要先重開機)

後來用不同topic 在網路上也找到這篇不錯的參考文章 https://sivatechworld.wordpress.com/2015/06/11/step-by-step-installing-and-configuring-mosquitto-with-windows-7/


 

Happy Coding Time~~

若要連接 MQTT, 在.NET ,nuget 最 popular 的library 是 M2Mqtt.Net

https://www.nuget.org/packages/M2Mqtt/

不管連上 RabbitMQ 的mqtt broker plugin 還是 mosquitto broker 都是一樣的,因為協定都是MQTT

發送端的Code 大致可以寫成 (下面是一個console project)

static void Main(string[] args)
{     
     var id = args[0];
     var topic = args[1];

     MqttClient client = new MqttClient("your hostname or ip");

     //若有輸入user, password, 接收端也要一樣的user, password
     client.Connect(id, "user", "password");

     //若要知道有沒有發送出去可以增加監聽發送後的Event
     //client.MqttMsgPublished += new MqttClient.MqttMsgPublishedEventHandler(client_MqttMsgPublished);


     Stopwatch sw = new Stopwatch();
     sw.Start();
     for (int i=0; i<10000, i++)
     {
          //publish 最後一的parameter 是retain, 會決定是否保留著最後一個message 
          ushort t = client.Publish(topic, Encoding.UTF8.GetBytes("XXXXXtestmessage, " 
          +Guid.NewGuid().ToString() + DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss") + ", times: " + i),
                    MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE,true);
     }
     sw.Stop();
     Console.WriteLine("花費時間..... {0}", sw.Elapsed);
     Console.ReadLine();
     client.Disconnect();
}


private static void client_MqttMsgPublished(object sender, MqttMsgPublishedEventArgs e)
        {
            Console.WriteLine("Message Published");
            Console.WriteLine(e.IsPublished);
            Console.WriteLine(e.MessageId);
        }

而接收端的Code 則是 (下面則是另一個console project)

static void Main(string[] args)
{
    var id = args[0];
    var topic = args[1];

    MqttClient client = new MqttClient("your hostname or ip");

    //如果有出現相同的id 會把前一個id 給踢掉
    //要與發送端相同
    client.Connect(id, "user", "password");

    //topic 可以使用 topic1/#, topic2/+ 去接收相似的topic ,有點類似 sql like 的功能但限制較多
    string[] topic = { topic };

    byte[] qoslevels = { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE };

    client.Subscribe(topic, qoslevels);

    client.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(client_PublishArrived);

    Console.ReadLine();
    client.Disconnect();
}

private static void client_PublishArrived(object sender, MqttMsgPublishEventArgs e)
{
    Console.WriteLine("Message Received");
    Console.WriteLine(e.Topic);
    Console.WriteLine(Encoding.UTF8.GetString(e.Message));
}

 

上面的程式碼,接收端可以輸入不同參數 使用不同的id 去listen 同一個topic (起不同的Console process)

換言之可以完成 publish / subscribe 一對多 的message pattern 

 

目前測試起來

效能方面,AMQP 之類的message queue protocol 當然沒得比

上面Code的傳輸內容,一萬筆資料, 在 raspberry pi 上用mosquitto 來當broker的情況下,不用一秒鐘,約0.8 秒 ,

換成是 rabbitMQ 當MQTT broker 同樣在 raspberry pi 上,大概要二秒多,而且 CPU 幾乎是滿的,跟官方文件差不多

而在Windows 7 (這邊用的測試機是 i3 的cpu) 上的mosquitto 在沒有調整的狀況下,大概是一秒半左右(不過我也不知道有什麼要調整的),CPU 則沒用到多少

 

心得:

MQTT 跟之前接觸到的Message Queue 真的不太一樣

如果沒有接收單位接收走,發送端的發出的訊息就會不見了,或許這是IoT sensor 傳送資料要考量的特性,因為資料太多

接下來就是再把要傳遞的資料,用JSON.net 做 json 序列化反序列化,大概是一個蠻好的接收、傳送,快速產生大量資料的解決方案

另外就是最好可以 完成一套 可以掛載(使用 DI )不同 message queue protocol (mqtt, amqp, stomp) 用來滿足不同message pattern 的 library 了