Message Queue (訊息佇列系統)

大宗常用的Message Queue (訊息佇列系統) 比較。
 


目前有碰到的是RabbitMQ(AMQP),因近期被問到MQTT,所以紀錄一下各個優缺點。



MQTT、RabbitMQ、Kafka比較

特性MQTTRabbitMQ(AMQP)Kafka
主要定位IoT 訊息協議企業訊息佇列分散式事件流平台
傳輸模式Publish / SubscribeQueue + Pub/SubEvent Streaming
吞吐量低~中非常高
延遲非常低低~中
資料持久化通常不長期保存可持久化長時間保存
可靠性QoS 0 / 1 / 2高可靠 (ACK)高可靠 (Replication)
系統複雜度
常見用途IoT裝置、感測器任務隊列、微服務大數據、事件流

MQTT 

MQTT 是一個輕量、低耗能的訊息發布/訂閱協定,適合 IoT 或行動裝置使用。它通過 Broker 將訊息可靠分發給多個訂閱者,優點是低帶寬、高效能、可跨平台。

MQTT Client (Publisher)
       │ MQTT 協定發送訊息
      ▼
┌───────────────────────────────────┐
│ RabbitMQ Broker (MQTT Plugin)                                    │
│ Exchange → Queue 路由訊息                                         │
└───────────────────────────────────┘
       │ AMQP 或 MQTT 協定取出訊息
      ▼
Consumer (C# 或其他語言)

 QoS(Quality of Service)

MQTT QoS(Quality of Service)

QoS 等級特性傳送方式適合場景
QoS 0At most once(最多一次)訊息最多送一次,可能丟失Publisher 發訊息後不等待確認,Broker 收到就發給 Subscriber,不會重試

- 監控溫度、傳感器訊號

- 丟掉一筆不影響整體

QoS 1At least once(至少一次)訊息至少送一次,可能重複Publisher 發訊息後等待 Broker 回覆 ACK,若沒收到 ACK,會重傳

- 訊息重要性中等,例如電表抄表數據、訂單通知

- 保證訊息不丟,但可能重複

QoS 2Exactly once(剛好一次)訊息只送一次,最可靠Publisher 與 Broker 之間用四步握手確認,確保訊息不丟也不重複

- 金融交易、關鍵控制訊息

- 保證訊息剛好到達一次,但速度較慢

MQTT協定只有Topic概念,「MQTT Client 發訊息到 RabbitMQ,預設都經過 amq.topic Topic Exchange,MQTT Topic 自動映射成 RoutingKey,Queue 透過 RoutingKey 收訊息。」

MQTT Publisher (Client)
  │ 發布訊息到 Topic
  │ 例如: ai/power/assignment
 ▼
┌───────────────────────────────────────┐
│ RabbitMQ Broker (MQTT Plugin)                                               │
│                                                                                                    │
│ 1️⃣ 收到 MQTT Topic                                                                 │
│ 2️⃣ 映射成 Exchange + RoutingKey                                          │
│     Exchange   = amq.topic (預設)                                              │
│     RoutingKey = ai.power.assignment                                       │
└───────────────────────────────────────┘
  │ Exchange 根據 RoutingKey 分流
 ▼
Queue 1  (綁定 RoutingKey = ai.power.assignment)
Queue 2  (綁定 RoutingKey = ai.power.*)
  │
 ▼
Subscriber 1 (MQTT Client 或 AMQP Client)
Subscriber 2 (MQTT Client 或 AMQP Client)
  │
 ▼
處理訊息 / 執行任務

RabbitMQ
 

RabbitMQ 預設使用 AMQP 協定來傳輸訊息,AMQP 是一個「訊息佇列通訊協定」,定義:Producer / Broker(指RabbitMQ 整個服務) / Consumer 要怎麼交換訊息。

RabbitMQ 也可以透過 plugin 支援 MQTT 或 STOMP 等協定。


實作RabbitMQ 中有三個核心概念:

1. Exchange(Topic Exchange):訊息的入口與分流中心
2. Routing Key:訊息的分類標籤
3. QueueBind:Queue 的訂閱規則


Producer

│ exchange = "ai.topic.exchange"
│ routingKey = "ai.power_assignment"

RabbitMQ Broker


Topic Exchange(不存資料,只負責分流)

│ 根據 routing key + queue binding rules 比對

Queue


Consumer(從 Queue 消費)

Producer 發送訊息時會指定 exchange 與 routing key,Topic Exchange 會根據 routing key 與queue binding rule 比對,只有符合規則的 Queue 才會收到訊息,Consumer 則從 Queue 中消費訊息。
 

C# Publisher

using RabbitMQ.Client;
using System.Text;
public class RabbitMqPublisher
{
   private readonly IConnection _connection;
   private readonly IModel _channel;
   private readonly string _exchangeName = "ai.topic.exchange";
   public RabbitMqPublisher()
   {
       var factory = new ConnectionFactory()
       {
           HostName = "localhost"
       };
       _connection = factory.CreateConnection();
       _channel = _connection.CreateModel();
       // 建立 Topic Exchange(如果已存在也不影響)
       _channel.ExchangeDeclare(
           exchange: _exchangeName,
           type: ExchangeType.Topic,
           durable: true);
   }
   
   //需傳入對應的routingKey規則
   public void Publish(string message, string routingKey)
   {
       var body = Encoding.UTF8.GetBytes(message);
       _channel.BasicPublish(
           exchange: _exchangeName,
           routingKey: routingKey,
           basicProperties: null,
           body: body);
       Console.WriteLine($"已發送: {message}, routingKey: {routingKey}");
   }
}
 

C# Subscriber(Consumer)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
public class SimpleRabbitMqConsumer
{
   private readonly IConnection _connection;
   private readonly IModel _channel;
   private readonly string _exchangeName = "ai.topic.exchange";
   private readonly string[] _routingKeys =
   {
       "ai.power_assignment",
       "ai.status_result"
   };
   public SimpleRabbitMqConsumer()
   {
       var factory = new ConnectionFactory()
       {
           HostName = "localhost"
       };
       _connection = factory.CreateConnection();
       _channel = _connection.CreateModel();
       
       // 1. 建立 Topic Exchange
       _channel.ExchangeDeclare(
           exchange: _exchangeName,
           type: ExchangeType.Topic,
           durable: true);
           
       // 2. 建立 Queue(RabbitMQ 自動產生名稱)
       var queueName = _channel.QueueDeclare().QueueName;
       
       // 3. 綁定 routing keys(訂閱)
       foreach (var routingKey in _routingKeys)
       {
           _channel.QueueBind(queueName, _exchangeName, routingKey);
       }
       
       // 4. 建立 Consumer
       var consumer = new AsyncEventingBasicConsumer(_channel);
       consumer.Received += async (model, ea) =>
       {
           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
           var routingKey = ea.RoutingKey;
           
           //可以根據routingKey 取得取得對應Queue作相對應的事
           
           Console.WriteLine($"收到訊息: {message}");
           Console.WriteLine($"routingKey: {routingKey}");
           await Task.CompletedTask;
       };
       
       // 5. 開始消費 Queue
       _channel.BasicConsume(
           queue: queueName,
           autoAck: true,
           consumer: consumer);
       Console.WriteLine("RabbitMQ Consumer 已啟動...");
   }
}

上述是Exchange Topic範例,其實RabbitMQ Exchange 類型匹配規則,有分3種:

Exchange Type匹配規則口訣
Direct只看 routingKey 完全匹配「精準投」
Fanout只看 Exchange,不看 routingKey「廣播,全部收」
Topic需匹配 Exchange + routingKey(支援 * 單層/ # 多層)
Queue Bind Patterns:
1) ai.power.* → 匹配三層 routingKey,最後一層任意
2) ai.power.# → 匹配三層或更多層 routingKey,最後多層任意
「分層分類收」
  • Direct → 一對一,訊息送到 routingKey 對應的 Queue
  • Fanout → 一對多,訊息廣播給所有綁定的 Queue
  • Topic → 分層匹配,exchange 及 routingKey 皆要對,Queue 可以用萬用字元訂閱一批訊息

Kafka

  • Event Streaming 的意思是:用Kafka即時傳輸與處理系統事件的資料流架構,Kafka 就像「資料版的高速公路」,事件就是在高速公路上不斷流動的車。
  • 資料副本(Replication) 的意思是:
    同一份資料會被複製到多台伺服器上保存,避免單一伺服器故障造成資料遺失。