前一篇文章跟大家介紹的 Azure Queue Storage 接收訊息只能用定時 Pull 的方式,對於訊息需要隨送隨收的場景,Azure Queue Storage 用起來就顯得拐腳,Azure 有另外一個 Message Queue 服務 - Azure Service Bus,它的功能就比 Azure Queue Storage 更完整,稍貴一些,但還是比自己架要省。
想了解 Azure Queue Storage 與 Azure Service Bus 差異的朋友,可以參考這篇官方文件 - 儲存體佇列和服務匯流排佇列 - 異同比較,裡面介紹得非常詳細。
建立服務匯流排(Service Bus)
在 Azure 後台的「所有服務
」當中,搜尋「服務匯流排
」。
點進去之後,點擊「+建立
」,輸入下列資訊來建立 Service Bus 服務:
- 資源群組
- 命名空間:命名空間我們可以想像成是 Service Bus 的名稱
- 位置
- 定價層:不同的定價層支援的功能不一樣,如果需要更進階的,像是主題(Topic)、交易(Transaction)、...等功能,那至少要選到標準定價層以上,相關更詳細的定價層比較可以參考服務匯流排定價,我們只是要單純送收訊息,選擇「
基本
」定價層即可。
建立佇列(Queue)
一個 Service Bus 服務底下可以有很多佇列(Queue),所以接下來我們就建立一個佇列用來儲存訊息,在剛剛建立的 Service Bus 後台之中,切換到「佇列
」管理頁面,點擊「+佇列
」,輸入「名稱
」之後,其他欄位暫時保持預設值,最後按「建立
」
上面紫星標註的欄位特別說明一下:
鎖定持續時間
:接收端在接收到訊息之後,必須回應「完成
」的訊號,代表訊息成功被接收,則訊息才會從佇列中被移除,而訊息在被移除前會進入鎖定的狀態,如果訊息沒有被移除,則鎖定持續時間過後,會重新進入佇列之中等待被接收。最大傳遞計數
:訊息在被從佇列移除之前,可以被接收的次數,超過這個次數,訊息會被丟到DeadLetter
佇列之中,這個 DeadLetter 佇列會佔用我們 Service Bus 的空間,可以將它想像成是資源回收筒,而且在裡面的訊息不會被自動刪除,必須由我們自己手動刪除,所以我們要特別注意 DeadLetter 之中的訊息量。訊息留存時間
:訊息在佇列中可存活的時間,超過存活時間的訊息會被從佇列中移除。
起手式
佇列建好之後,我們就可以開始來寫程式了,我打算用 C# 來撰寫範例程式,首先安裝 Azure.Messaging.ServiceBus
套件,然後到 Service Bus 的後台,切換到「共用存取原則
」,點擊「RootManageSharedAccessKey
」,預設有兩組連線字串,我們複製其中一組起來備用。
傳送一個訊息/傳送多個訊息/傳送排程訊息
傳送單一個訊息及多個訊息較為單純,就是把訊息準備好,然後丟出去就可以了,說明在程式的註解之中。
private static async Task SendMessage()
{
// 產生 Service Bus Client
var client = new ServiceBusClient(ConnectionString);
// 產生 Service Bus Sender
var sender = client.CreateSender(QueueName);
// 傳送訊息
await sender.SendMessageAsync(new ServiceBusMessage("my message"));
// 批次傳送訊息
await sender.SendMessagesAsync(
new List<ServiceBusMessage>
{
new ServiceBusMessage("my message1"),
new ServiceBusMessage("my message2")
});
// 傳送排程訊息
await sender.SendMessageAsync(new ServiceBusMessage("my scheduled message")
{
ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(30)
});
}
接收訊息/破壞性接收訊息
Azure Service Bus 接收訊息的方式乍看之下很像 Pull 的方式,其實不然,當我們呼叫 ReceiveMessageAsync()
來接收訊息時,佇列中沒有訊息的話,它會進行等待,預設會等待 60 秒,我們也可以傳入 maxWaitTime
參數來改變等待的時間,當等待時間過後,如果沒有訊息進來,我們需要再次呼叫 ReceiveMessageAsync() 方法重新等待訊息進來。
private static async Task ReceiveMessage()
{
// ...
// 產生 Service Bus Receiver
var receiver = client.CreateReceiver(QueueName);
while (true)
{
var messaage = await receiver.ReceiveMessageAsync();
if (messaage == null) continue;
// ...
await receiver.CompleteMessageAsync(messaage);
}
}
如果我們覺得這種跑迴圈的方式不踏實,可以改用 ReceiveMessagesAsync()
方法,它可是用非同步枚舉
的方式來接收訊息,但是要記得做好取消的機制。
private static void Main(string[] args)
{
var cts = new CancellationTokenSource();
ReceiveMessages(cts.Token);
while (Console.ReadLine() != "cancel") { }
cts.Cancel();
// ...
}
private static async Task ReceiveMessages(CancellationToken cancellationToken)
{
// ...
// 產生 Service Bus Receiver
var receiver = client.CreateReceiver(QueueName);
await foreach (var messaage in receiver.ReceiveMessagesAsync(cancellationToken))
{
// ...
await receiver.CompleteMessageAsync(messaage);
}
}
從上面兩段接收訊息的程式碼中,可以看到在收到訊息之後都有再呼叫一個 CompleteMessageAsync()
方式,如果覺得麻煩,我們可以將 ReceiveMode
改成 ServiceBusReceiveMode.ReceiveAndDelete
,訊息在收到的當下就會自動回應完成訊號,缺點就是當訊息處理的過程中有錯誤狀況發生時,這個訊息可能就無法再被重新處理。
private static async Task ReceiveAndDeleteMessage()
{
// ...
// 產生 Service Bus Receiver
var receiver = client.CreateReceiver(
QueueName,
new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
while (true)
{
var messaage = await receiver.ReceiveMessageAsync();
if (messaage == null) continue;
// ...
}
}
訊息接收處理器
如果我們需要使用 Event-Driven
的方式來接收訊息,則可以改用 ServiceBusProcessor
。
private static void Main(string[] args)
{
var cts = new CancellationTokenSource();
MessageProcessor(cts.Token);
while (Console.ReadLine() != "cancel") { }
cts.Cancel();
// ...
}
private static Task MessageProcessor(CancellationToken cancellationToken)
{
// ...
// 建立 Service Bus Processor 選項
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = true, // 是否在 ProcessMessageAsync Handler 執行完成後,回應完成訊號。
MaxConcurrentCalls = 20 // 同時最多處理幾個訊息
};
// 產生 Service Bus Processor
var processor = client.CreateProcessor(QueueName, options);
// 註冊訊息接收事件
processor.ProcessMessageAsync += args =>
{
Console.WriteLine(args.Message.Body.ToString());
return Task.CompletedTask;
};
// 註冊訊息處理錯誤事件
processor.ProcessErrorAsync += args =>
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
};
return processor.StartProcessingAsync(cancellationToken);
}
其中 AutoCompleteMessages
這個屬性跟 ServiceBusReceiveMode.ReceiveAndDelete 的設定不一樣,ReceiveAndDelete 是收到訊息的當下就回應完成訊號,而 AutoCompleteMessages 則是等訊息接收事件被執行完成後,才會回應完成訊號,如果過程中有發生例外錯誤,則不會回應完成訊號。
定義「即時」
看到這邊我們可以知道 Azure Service Bus 在訊息接收的延遲性上,比 Azure Queue Storage 來得即時許多,這是從技術層面上去分析研究所了解到的即時,而我個人在評估一個系統的回應速度要做得多快,是遵照「有一種快叫 User 覺得快
」的原則,這是從需求層面上去定義即時,假設我們要做一個聊天室好了,使用者的聊天對象通常是「人」,所以大多不會預期訊息送出去之後,立即就會收到對方的回應,因此我們接收訊息的方式不即時,而是 2 ~ 3 秒 Polling 一次又何妨?所以說,我們在做技術選型時候,應該要保持冷靜,使用者的需求說不定一個簡單到不行的解決方案就可以滿足了,講到這邊又不禁讓我想起搜尋系統砍掉重練的故事了,有機會再跟大家分享。
以上,Azure Service Bus 基本的使用方式就分享給大家,希望對大家在使用 Azure 上相關 Message Queue 服務時有一點點幫助。
參考資料
- 服務匯流排寄不出的信件佇列的概觀
- 使用服務匯流排傳訊的效能改進最佳作法
- azure-sdk-for-net/sdk/servicebus/Azure.Messaging.ServiceBus/samples