[料理佳餚] 在 RabbitMQ 排程一個延後傳遞的訊息

我們對於 Message Queue 的既定印象就是先進先出,先發送的訊息就先傳遞出去,這兩天收到一個需求是希望通知能夠在指定的時間傳遞出去,以往這類型的需求我們會跳過 Message Queue,改用定期去看資料庫或是某個資料夾,如果有資料或檔案內容的指定時間符合當下的時間,我們就發送通知,但我現在想讓 Message Queue 處理延後傳遞的訊息。

我用的 Mesaage Queue 系統是 RabbitMQ,版本是 3.6.11,安裝在 Windows Server 2016 作業系統上,RabbitMQ 有一個 Plugin 叫 rabbitmq_delayed_message_exchange,我們可以利用這個 Plugin 來發送延後傳遞的訊息。

安裝

我們從 Community Plugins — RabbitMQ 找到 rabbitmq_delayed_message_exchange 並且把它下載回來,要注意的是 RabbitMQ 的版本至少要 3.6 以上,下載之後就解壓縮到 RabbitMQ 安裝目錄下的 plugins 資料夾

打開命令提示字元,切換到 RabbitMQ 安裝目錄底下的 sbin 資料夾,執行下列指令來啟動 rabbitmq_delayed_message_exchange Plugin。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

用管理介面來綁定 Queue 到 x-delayed-message Exchange

如果我們有啟動 rabbitmq_management Plugin,那麼我們可以選擇從管理介面來新增 x-delayed-message Exchange,並且將 Queue 綁定到 Exchange 上。

新增 x-delayed-message Exchange

Exchange 的 Name 就自訂一個自己喜歡的名稱,Type 選擇 x-delayed-message,需要新增一個 Argument,Key 為 x-delayed-type,Value 為 direct

新增 Queue

我們新增一個 Queue,用來從 x-delayed-message Exchange 接收延遲傳遞的訊息。

綁定 Queue 到 x-delayed-message Exchange

From exchange 欄位輸入剛剛新增的 x-delayed-message Exchange 名稱,自訂一個 Routing Key

用程式碼來綁定 Queue 到 x-delayed-message Exchange

除了用 RabbitMQ 的管理介面之外,我們還可以用程式碼來綁定 Queue 到 x-delayed-message Exchange。

// 新增 x-delayed-message Exchange
var exchangeName = "delayed-message";

var args = new Dictionary<string, object> { ["x-delayed-type"] = "direct" };

channel.ExchangeDeclare(exchangeName, "x-delayed-message", true, false, args);

// 新增 Queue
var queueName = "delayed";

channel.QueueDeclare(queueName, true, false, false, null);

// 綁定 Queue 到 x-delayed-message Exchange
var routingKey = "delayed-notification";

channel.QueueBind(queueName, exchangeName, routingKey, null);

發送延後傳遞的訊息

Queue 綁定好之後,我們在發送的訊息中要加入一個 Header,Key 是 x-delay,Value 是毫秒。

var delay = new Dictionary<string, object> { ["x-delay"] = 5000 };

channel.BasicPublish(
    exchangeName,
    routingKey,
    new BasicProperties { Headers = delay },
    Encoding.UTF8.GetBytes($"Hello, Send: {DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}"));

我分別測試了 5 秒、1 小時、2 小時,訊息皆在預期的時間傳遞過來。

發送特定時間傳遞的訊息

我們把指定的時間減掉當前時間的總毫秒數,拿來當 x-delay 的參數值,就可以做到在特定時間才把訊息傳遞出去,根據文件的說明,最大毫秒數為 (2^32)-1 毫秒。

var delayMilliseconds = Convert.ToInt32(new DateTime(2019, 5, 16, 17, 55, 0).Subtract(DateTime.Now).TotalMilliseconds);

var delay = new Dictionary<string, object> { ["x-delay"] = delayMilliseconds };

channel.BasicPublish(
    exchangeName,
    routingKey,
    new BasicProperties { Headers = delay },
    Encoding.UTF8.GetBytes($"Hello, Send: {DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}"));

另外就是相信各位應該有注意到,那個送達的時間有些許的落差,也就是說不是百分之百地精準,由於這個主要是靠 Countdown 來做到在特定時間傳遞訊息,所以 RabbitMQ Server 的 Performance,以及 Client 端與 Server 端時間的不同步,都會造成時間差,因此如果對時間敏感的需求,建議不要使用 RabbitMQ 的 rabbitmq_delayed_message_exchange Plugin 來處理延後傳遞的訊息。

參考資料