[食譜好菜] 比 Azure Queue Storage 功能更完整的 Message Queue 服務 - Azure Service Bus

前一篇文章跟大家介紹的 Azure Queue Storage 接收訊息只能用定時 Pull 的方式,對於訊息需要隨送隨收的場景,Azure Queue Storage 用起來就顯得拐腳,Azure 有另外一個 Message Queue 服務 - Azure Service Bus,它的功能就比 Azure Queue Storage 更完整,稍貴一些,但還是比自己架要省。

想了解 Azure Queue Storage 與 Azure Service Bus 差異的朋友,可以參考這篇官方文件 - 儲存體佇列和服務匯流排佇列 - 異同比較,裡面介紹得非常詳細。

建立服務匯流排(Service Bus)

在 Azure 後台的「所有服務」當中,搜尋「服務匯流排」。

點進去之後,點擊「+建立」,輸入下列資訊來建立 Service Bus 服務:

  1. 資源群組
  2. 命名空間:命名空間我們可以想像成是 Service Bus 的名稱
  3. 位置
  4. 定價層:不同的定價層支援的功能不一樣,如果需要更進階的,像是主題(Topic)、交易(Transaction)、...等功能,那至少要選到標準定價層以上,相關更詳細的定價層比較可以參考服務匯流排定價,我們只是要單純送收訊息,選擇「基本」定價層即可。

建立佇列(Queue)

一個 Service Bus 服務底下可以有很多佇列(Queue),所以接下來我們就建立一個佇列用來儲存訊息,在剛剛建立的 Service Bus 後台之中,切換到「佇列」管理頁面,點擊「+佇列」,輸入「名稱」之後,其他欄位暫時保持預設值,最後按「建立

上面紫星標註的欄位特別說明一下:

  • 鎖定持續時間:接收端在接收到訊息之後,必須回應「完成」的訊號,代表訊息成功被接收,則訊息才會從佇列中被移除,而訊息在被移除前會進入鎖定的狀態,如果訊息沒有被移除,則鎖定持續時間過後,會重新進入佇列之中等待被接收。
  • 最大傳遞計數:訊息在被從佇列移除之前,可以被接收的次數,超過這個次數,訊息會被丟到 DeadLetter 佇列之中,這個 DeadLetter 佇列會佔用我們 Service Bus 的空間,可以將它想像成是資源回收筒,而且在裡面的訊息不會被自動刪除,必須由我們自己手動刪除,所以我們要特別注意 DeadLetter 之中的訊息量。
  • 訊息留存時間:訊息在佇列中可存活的時間,超過存活時間的訊息會被從佇列中移除。

起手式

佇列建好之後,我們就可以開始來寫程式了,我打算用 C# 來撰寫範例程式,首先安裝 Azure.Messaging.ServiceBus 套件,然後到 Service Bus 的後台,切換到「共用存取原則」,點擊「RootManageSharedAccessKey」,預設有兩組連線字串,我們複製其中一組起來備用。

傳送一個訊息/傳送多個訊息/傳送排程訊息

傳送單一個訊息及多個訊息較為單純,就是把訊息準備好,然後丟出去就可以了,說明在程式的註解之中。

private static async Task SendMessage()
{
    // 產生 Service Bus Client
    var client = new ServiceBusClient(ConnectionString);

    // 產生 Service Bus Sender
    var sender = client.CreateSender(QueueName);

    // 傳送訊息
    await sender.SendMessageAsync(new ServiceBusMessage("my message"));

    // 批次傳送訊息
    await sender.SendMessagesAsync(
        new List<ServiceBusMessage>
        {
            new ServiceBusMessage("my message1"),
            new ServiceBusMessage("my message2")
        });

    // 傳送排程訊息
    await sender.SendMessageAsync(new ServiceBusMessage("my scheduled message")
                                  {
                                      ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(30)
                                  });
}

接收訊息/破壞性接收訊息

Azure Service Bus 接收訊息的方式乍看之下很像 Pull 的方式,其實不然,當我們呼叫 ReceiveMessageAsync() 來接收訊息時,佇列中沒有訊息的話,它會進行等待,預設會等待 60 秒,我們也可以傳入 maxWaitTime 參數來改變等待的時間,當等待時間過後,如果沒有訊息進來,我們需要再次呼叫 ReceiveMessageAsync() 方法重新等待訊息進來。

private static async Task ReceiveMessage()
{
    // ...

    // 產生 Service Bus Receiver
    var receiver = client.CreateReceiver(QueueName);

    while (true)
    {
        var messaage = await receiver.ReceiveMessageAsync();

        if (messaage == null) continue;

        // ...

        await receiver.CompleteMessageAsync(messaage);
    }
}

如果我們覺得這種跑迴圈的方式不踏實,可以改用 ReceiveMessagesAsync() 方法,它可是用非同步枚舉的方式來接收訊息,但是要記得做好取消的機制。

private static void Main(string[] args)
{
    var cts = new CancellationTokenSource();

    ReceiveMessages(cts.Token);

    while (Console.ReadLine() != "cancel") { }

    cts.Cancel();
    
    // ...
}

private static async Task ReceiveMessages(CancellationToken cancellationToken)
{
    // ...

    // 產生 Service Bus Receiver
    var receiver = client.CreateReceiver(QueueName);

    await foreach (var messaage in receiver.ReceiveMessagesAsync(cancellationToken))
    {
        // ...

        await receiver.CompleteMessageAsync(messaage);
    }
}

從上面兩段接收訊息的程式碼中,可以看到在收到訊息之後都有再呼叫一個 CompleteMessageAsync() 方式,如果覺得麻煩,我們可以將 ReceiveMode 改成 ServiceBusReceiveMode.ReceiveAndDelete,訊息在收到的當下就會自動回應完成訊號,缺點就是當訊息處理的過程中有錯誤狀況發生時,這個訊息可能就無法再被重新處理。

private static async Task ReceiveAndDeleteMessage()
{
    // ...

    // 產生 Service Bus Receiver
    var receiver = client.CreateReceiver(
        QueueName,
        new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });

    while (true)
    {
        var messaage = await receiver.ReceiveMessageAsync();

        if (messaage == null) continue;

        // ...
    }
}

訊息接收處理器

如果我們需要使用 Event-Driven 的方式來接收訊息,則可以改用 ServiceBusProcessor

private static void Main(string[] args)
{
    var cts = new CancellationTokenSource();

    MessageProcessor(cts.Token);

    while (Console.ReadLine() != "cancel") { }

    cts.Cancel();

    // ...
}

private static Task MessageProcessor(CancellationToken cancellationToken)
{
    // ...

    // 建立 Service Bus Processor 選項
    var options = new ServiceBusProcessorOptions
                  {
                      AutoCompleteMessages = true,  // 是否在 ProcessMessageAsync Handler 執行完成後,回應完成訊號。
                      MaxConcurrentCalls = 20       // 同時最多處理幾個訊息
                  };

    // 產生 Service Bus Processor
    var processor = client.CreateProcessor(QueueName, options);

    // 註冊訊息接收事件
    processor.ProcessMessageAsync += args =>
        {
            Console.WriteLine(args.Message.Body.ToString());
            
            return Task.CompletedTask;
        };

    // 註冊訊息處理錯誤事件
    processor.ProcessErrorAsync += args =>
        {
            Console.WriteLine(args.Exception);

            return Task.CompletedTask;
        };

    return processor.StartProcessingAsync(cancellationToken);
}

其中 AutoCompleteMessages 這個屬性跟 ServiceBusReceiveMode.ReceiveAndDelete 的設定不一樣,ReceiveAndDelete 是收到訊息的當下就回應完成訊號,而 AutoCompleteMessages 則是等訊息接收事件被執行完成後,才會回應完成訊號,如果過程中有發生例外錯誤,則不會回應完成訊號。

定義「即時」

看到這邊我們可以知道 Azure Service Bus 在訊息接收的延遲性上,比 Azure Queue Storage 來得即時許多,這是從技術層面上去分析研究所了解到的即時,而我個人在評估一個系統的回應速度要做得多快,是遵照「有一種快叫 User 覺得快」的原則,這是從需求層面上去定義即時,假設我們要做一個聊天室好了,使用者的聊天對象通常是「人」,所以大多不會預期訊息送出去之後,立即就會收到對方的回應,因此我們接收訊息的方式不即時,而是 2 ~ 3 秒 Polling 一次又何妨?所以說,我們在做技術選型時候,應該要保持冷靜,使用者的需求說不定一個簡單到不行的解決方案就可以滿足了,講到這邊又不禁讓我想起搜尋系統砍掉重練的故事了,有機會再跟大家分享。

以上,Azure Service Bus 基本的使用方式就分享給大家,希望對大家在使用 Azure 上相關 Message Queue 服務時有一點點幫助。

參考資料

相關資源

C# 指南
ASP.NET 教學
ASP.NET MVC 指引
Azure SQL Database 教學
SQL Server 教學
Xamarin.Forms 教學