大宗常用的Message Queue (訊息佇列系統) 比較。
目前有碰到的是RabbitMQ(AMQP),因近期被問到MQTT,所以紀錄一下各個優缺點。
MQTT、RabbitMQ、Kafka比較
| 特性 | MQTT | RabbitMQ(AMQP) | Kafka |
|---|---|---|---|
| 主要定位 | IoT 訊息協議 | 企業訊息佇列 | 分散式事件流平台 |
| 傳輸模式 | Publish / Subscribe | Queue + Pub/Sub | Event Streaming |
| 吞吐量 | 低~中 | 中 | 非常高 |
| 延遲 | 非常低 | 低 | 低~中 |
| 資料持久化 | 通常不長期保存 | 可持久化 | 長時間保存 |
| 可靠性 | QoS 0 / 1 / 2 | 高可靠 (ACK) | 高可靠 (Replication) |
| 系統複雜度 | 低 | 中 | 高 |
| 常見用途 | IoT裝置、感測器 | 任務隊列、微服務 | 大數據、事件流 |
MQTT
MQTT 是一個輕量、低耗能的訊息發布/訂閱協定,適合 IoT 或行動裝置使用。它通過 Broker 將訊息可靠分發給多個訂閱者,優點是低帶寬、高效能、可跨平台。
MQTT Client (Publisher)
│ MQTT 協定發送訊息
▼
┌───────────────────────────────────┐
│ RabbitMQ Broker (MQTT Plugin) │
│ Exchange → Queue 路由訊息 │
└───────────────────────────────────┘
│ AMQP 或 MQTT 協定取出訊息
▼
Consumer (C# 或其他語言)
QoS(Quality of Service)
MQTT QoS(Quality of Service)
| QoS 等級 | 特性 | 傳送方式 | 適合場景 |
|---|---|---|---|
| QoS 0At most once(最多一次) | 訊息最多送一次,可能丟失 | Publisher 發訊息後不等待確認,Broker 收到就發給 Subscriber,不會重試 | - 監控溫度、傳感器訊號 - 丟掉一筆不影響整體 |
| QoS 1At least once(至少一次) | 訊息至少送一次,可能重複 | Publisher 發訊息後等待 Broker 回覆 ACK,若沒收到 ACK,會重傳 | - 訊息重要性中等,例如電表抄表數據、訂單通知 - 保證訊息不丟,但可能重複 |
| QoS 2Exactly once(剛好一次) | 訊息只送一次,最可靠 | Publisher 與 Broker 之間用四步握手確認,確保訊息不丟也不重複 | - 金融交易、關鍵控制訊息 - 保證訊息剛好到達一次,但速度較慢 |
MQTT協定只有Topic概念,「MQTT Client 發訊息到 RabbitMQ,預設都經過 amq.topic Topic Exchange,MQTT Topic 自動映射成 RoutingKey,Queue 透過 RoutingKey 收訊息。」
MQTT Publisher (Client)
│ 發布訊息到 Topic
│ 例如: ai/power/assignment
▼
┌───────────────────────────────────────┐
│ RabbitMQ Broker (MQTT Plugin) │
│ │
│ 1️⃣ 收到 MQTT Topic │
│ 2️⃣ 映射成 Exchange + RoutingKey │
│ Exchange = amq.topic (預設) │
│ RoutingKey = ai.power.assignment │
└───────────────────────────────────────┘
│ Exchange 根據 RoutingKey 分流
▼
Queue 1 (綁定 RoutingKey = ai.power.assignment)
Queue 2 (綁定 RoutingKey = ai.power.*)
│
▼
Subscriber 1 (MQTT Client 或 AMQP Client)
Subscriber 2 (MQTT Client 或 AMQP Client)
│
▼
處理訊息 / 執行任務
RabbitMQ
RabbitMQ 預設使用 AMQP 協定來傳輸訊息,AMQP 是一個「訊息佇列通訊協定」,定義:Producer / Broker(指RabbitMQ 整個服務) / Consumer 要怎麼交換訊息。
RabbitMQ 也可以透過 plugin 支援 MQTT 或 STOMP 等協定。
實作RabbitMQ 中有三個核心概念:
1. Exchange(Topic Exchange):訊息的入口與分流中心
2. Routing Key:訊息的分類標籤
3. QueueBind:Queue 的訂閱規則
Producer
│
│ exchange = "ai.topic.exchange"
│ routingKey = "ai.power_assignment"
▼
RabbitMQ Broker
│
▼
Topic Exchange(不存資料,只負責分流)
│
│ 根據 routing key + queue binding rules 比對
▼
Queue
│
▼
Consumer(從 Queue 消費)
Producer 發送訊息時會指定 exchange 與 routing key,Topic Exchange 會根據 routing key 與queue binding rule 比對,只有符合規則的 Queue 才會收到訊息,Consumer 則從 Queue 中消費訊息。
C# Publisher
using RabbitMQ.Client;
using System.Text;
public class RabbitMqPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName = "ai.topic.exchange";
public RabbitMqPublisher()
{
var factory = new ConnectionFactory()
{
HostName = "localhost"
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// 建立 Topic Exchange(如果已存在也不影響)
_channel.ExchangeDeclare(
exchange: _exchangeName,
type: ExchangeType.Topic,
durable: true);
}
//需傳入對應的routingKey規則
public void Publish(string message, string routingKey)
{
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine($"已發送: {message}, routingKey: {routingKey}");
}
}
C# Subscriber(Consumer)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
public class SimpleRabbitMqConsumer
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName = "ai.topic.exchange";
private readonly string[] _routingKeys =
{
"ai.power_assignment",
"ai.status_result"
};
public SimpleRabbitMqConsumer()
{
var factory = new ConnectionFactory()
{
HostName = "localhost"
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// 1. 建立 Topic Exchange
_channel.ExchangeDeclare(
exchange: _exchangeName,
type: ExchangeType.Topic,
durable: true);
// 2. 建立 Queue(RabbitMQ 自動產生名稱)
var queueName = _channel.QueueDeclare().QueueName;
// 3. 綁定 routing keys(訂閱)
foreach (var routingKey in _routingKeys)
{
_channel.QueueBind(queueName, _exchangeName, routingKey);
}
// 4. 建立 Consumer
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
//可以根據routingKey 取得取得對應Queue作相對應的事
Console.WriteLine($"收到訊息: {message}");
Console.WriteLine($"routingKey: {routingKey}");
await Task.CompletedTask;
};
// 5. 開始消費 Queue
_channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine("RabbitMQ Consumer 已啟動...");
}
}上述是Exchange Topic範例,其實RabbitMQ Exchange 類型匹配規則,有分3種:
| Exchange Type | 匹配規則 | 口訣 |
|---|---|---|
| Direct | 只看 routingKey 完全匹配 | 「精準投」 |
| Fanout | 只看 Exchange,不看 routingKey | 「廣播,全部收」 |
| Topic | 需匹配 Exchange + routingKey(支援 * 單層/ # 多層)Queue Bind Patterns: 1) ai.power.* → 匹配三層 routingKey,最後一層任意 2) ai.power.# → 匹配三層或更多層 routingKey,最後多層任意 | 「分層分類收」 |
- Direct → 一對一,訊息送到 routingKey 對應的 Queue
- Fanout → 一對多,訊息廣播給所有綁定的 Queue
- Topic → 分層匹配,exchange 及 routingKey 皆要對,Queue 可以用萬用字元訂閱一批訊息
Kafka
- Event Streaming 的意思是:用Kafka即時傳輸與處理系統事件的資料流架構,Kafka 就像「資料版的高速公路」,事件就是在高速公路上不斷流動的車。
- 資料副本(Replication) 的意思是:
同一份資料會被複製到多台伺服器上保存,避免單一伺服器故障造成資料遺失。