使用.NET Core console專案測試RabbitMQ之Plugin MQTT

一開始會先安裝Erlang,RabbitMQ及Plugin MQTT

再撰寫.NET Core console專案來測試訂閱與發佈訊息

首先安裝RabbitMQ前要先安裝Erlang:

參考文章:https://www.gushiciku.cn/pl/gzsJ/zh-tw

安裝Erlang

下載Erlong:https://www.erlang.org/downloads

安裝完後設定環境變數:設定→進階系統設定→環境變數,加入系統變數如下圖:

變數:ERLANG_HOME

變數值:C:\Program Files\Erlang OTP

 

 

 

 

 

 

 

 

 

 

 

 

 

在path加入%ERLANG_HOME%\bin

 

 

 

 

 

 

 

 

 

 

 

 

 

開啟cmd再輸入erl確認erlang是否安裝成功

 

 

 

 

 

 

安裝RabbitMQ

下載網址:https://www.rabbitmq.com/download.html

 

 

 

 

 

安裝完後開啟安裝資料夾輸入cmd如下圖:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

安裝外掛Management UI

rabbitmq-plugins enable rabbitmq_management

 

 

 

 

 

 

參考文章:https://www.rabbitmq.com/management.html

 

 

 

 

 

 

以系統管理員身份開啟cmd輸入net start rabbitmq再重新啟動

開啟瀏覽器輸入網址:http://localhost:15672/,即可看到如下圖:

 

 

 

 

 

 

開啟外掛MQTT Plugin

rabbitmq-plugins enable rabbitmq_mqtt

參考文章:https://www.rabbitmq.com/mqtt.html

.NET Core console 專案

建立新的Console專案,與以Nuget下載套件:MQTTnet,在解決方案新增兩個新專案撰寫如下:

第一個新專案

Publisher.cs

using MQTTnet;
using MQTTnet.Client;

namespace MQTTPublisher
{
    internal class Publisher
    {
        static async Task Main(string[] args)
        {
            var mqttFactory = new MqttFactory();
            var client = mqttFactory.CreateMqttClient();

            try
            {
                var options = new MqttClientOptionsBuilder()
                    .WithClientId(Guid.NewGuid().ToString())
                    .WithTcpServer("localhost", 1883)
                    .WithCleanSession()
                    .Build();

                await client.ConnectAsync(options);

                Console.WriteLine("按任何鍵publish message");
                Console.ReadLine();

                await PublishMessageAsync(client);


            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                //await client.DisconnectAsync();
            }
        }

        /// <summary>
        /// 非同步發佈訊息
        /// </summary>
        /// <param name="client"></param>
        /// <returns></returns>
        private static async Task PublishMessageAsync(IMqttClient client)
        {
            string messagePayload = "Hello";

            string messagePayload1 = messagePayload + "1";
            var msg1 = new MqttApplicationMessageBuilder()
                .WithTopic("home/workStation1")
                .WithPayload(messagePayload1)
                .Build();

            string messagePayload2 = messagePayload + "2";
            var msg2 = new MqttApplicationMessageBuilder()
                .WithTopic("home/workStation2")
                .WithPayload(messagePayload2)
                .Build();

            if (client.IsConnected)
            {
                await client.PublishAsync(msg1);
                await client.PublishAsync(msg2);

                Console.WriteLine($"published Message-{messagePayload1}");
                Console.WriteLine($"published Message-{messagePayload2}");
            }
        }
    }
}

messagePayload2 可以註解掉,這段是開第二個publisher之Topic,但這次測試只有一個publisher之Topic

第二個新專案

Subscriber.cs

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Server;
using System.Text;

namespace MQTTSubscriber
{
    internal class Subscriber
    {
        static async Task Main(string[] args)
        {
            try
            {
                var mqttFactory = new MqttFactory();
                IMqttClient client = mqttFactory.CreateMqttClient();
                var options = new MqttClientOptionsBuilder()
                    .WithClientId(Guid.NewGuid().ToString())
                    .WithTcpServer("localhost", 1883)
                    .WithCleanSession()
                    .Build();

                client.ConnectedAsync += _mqttClient_ConnectedAsync;
                client.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;

                var topicFilter = new MqttTopicFilterBuilder()
                   .WithTopic("home/workStation1")
                   .Build();

                await client.ConnectAsync(options);
                await client.SubscribeAsync(topicFilter);

                Console.ReadLine();

                await client.DisconnectAsync();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public static async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine("subscriber1連到broker成功!");
            await Task.CompletedTask;
        }

        public static async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            var topic = arg.ApplicationMessage.Topic;
            var payloadText = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);

            Console.WriteLine($"subscriber1 Received: Topic:{topic}, Payload:{payloadText}");
            await Task.CompletedTask;
        }
    }
}

接著設定啟動專案

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

設定訂閱者和發佈者專案為啟動如下圖:

 

 

 

 

 

 

 

 

開始Debug觀察看看,若有問題要請教或需要完整的原始碼可以留言給我。