將 Wolverine 接上 RabbitMQ - 使用 WolverineFx.RabbitMQ

  • 49
  • 0
  • C#
  • 2024-10-21

前一篇「使用 Wolverine 實作生產者-消費者模式(Producer-Consumer Pattern)」介紹了將原本使用 Channels + BackgroundService 的生產者-消費者模式改用 Wolverine  來實作。

就以現實的問題來看,如果服務是相當頻繁地被使用時,如果 MessageCreatedEventHandler 消費者的處理速度有所延遲,那麼 Channels 裡就會堆積大量的 MessageCreatedEvent 事件,一旦服務出現問題而重新啟動或是有版本更新而需要重新部署,那麼佇列在 Channels 裡的事件就會消失不見了,這可是不得了的事情。所以這篇就來用 WolverineFx.RabbitMQ,生產者將事件發送到 RabbitMQ 裡,然後消費者再去接收存放在 RabbitMQ 裡的訊息,如此的改變就是為了避免因為服務重新啟動而讓佇列在 Channels 裡的事件消失不見。

WolverineFx.RabbitMQ

 

準備 RabbitMQ

之前有寫了這麼一篇「快速建立開發環境的 RabbitMQ Cluster - 使用 bat 執行檔的方式」,重新整理了那篇所使用的一些檔案內容,然後將 RabbitMQ Management 版本改為使用 4.0.2

Dockerfile

FROM rabbitmq:4.0.2-management

COPY definitions.json /etc/rabbitmq/

rabbitmq_cluster_setup.sh

#!/bin/bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app

definitions.json

{
    "rabbit_version": "4.0.2",
    "rabbitmq_version": "4.0.2",
    "product_name": "RabbitMQ",
    "product_version": "4.0.2",
    "users": [
        {
            "name": "admin",
            "password_hash": "rrTrQZDcgif07x5jHy+8mlDRil4yv+CY/VJi8UUAFzGmpgfy",
            "hashing_algorithm": "rabbit_password_hashing_sha256",
            "tags": [
                "administrator"
            ],
            "limits": {}
        }
    ],
    "vhosts": [
        {
            "name": "/",
            "description": "Default virtual host",
            "tags": [],
            "metadata": {
                "description": "Default virtual host",
                "tags": []
            }
        }
    ],
    "permissions": [
        {
            "user": "admin",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "topic_permissions": [],
    "parameters": [],
    "global_parameters": [
        {
            "name": "internal_cluster_id",
            "value": "rabbitmq-cluster-id-6n6jOh8xpgnG9JpYoDNNnA"
        }
    ],
    "policies": [],
    "queues": [],
    "exchanges": [],
    "bindings": []
}

init_rabbitmq_cluster.bat

@echo off
chcp 65001 > nul

:: 檢查是否存在 rabbitmq:4.0.2-management 映像
docker images | findstr "^rabbitmq" | findstr "4.0.2-management" > nul
if %errorlevel% neq 0 (
    echo "rabbitmq:4.0.2-management 映像不存在,正在拉取..."
    docker pull rabbitmq:4.0.2-management
) else (
    echo "rabbitmq:4.0.2-management 映像已存在,不需要重新拉取。"
)

:: 檢查是否存在 my-custom-rabbitmq:4.0.2-management 映像
docker images | findstr "my-custom-rabbitmq:4.0.2-management" > nul
if %errorlevel% equ 0 (
    echo "my-custom-rabbitmq:4.0.2-management 映像已存在,不需要重新構建。"
) else (
    :: 建立 RabbitMQ Docker 映像
    echo "建立 RabbitMQ Docker 映像..."
    docker build -t my-custom-rabbitmq:4.0.2-management .
)

:: 啟動 Docker 容器 1
echo "啟動 Docker 容器 1"
docker run -d --restart=always --name rabbitmq1 --hostname node1 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data1:/var/lib/rabbitmq:z" -p "4369:4369" -p "5671:5671" -p "5672:5672" -p "15671:15671" -p "15672:15672" -p "25672:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit my-custom-rabbitmq:4.0.2-management

:: 啟動 Docker 容器 2
echo "啟動 Docker 容器 2"
docker run -d --restart=always --name rabbitmq2 --hostname node2 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data2:/var/lib/rabbitmq:z" -p "5673:5672" -p "15673:15672" -p "25673:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit --link rabbitmq1:node1 rabbitmq:4.0.2-management

:: 啟動 Docker 容器 3
echo "啟動 Docker 容器 3"
docker run -d --restart=always --name rabbitmq3 --hostname node3 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data3:/var/lib/rabbitmq:z" -p "5674:5672" -p "15674:15672" -p "25674:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit --link rabbitmq1:node1 --link rabbitmq2:node2 rabbitmq:4.0.2-management

echo "所有 Docker 容器已建立"

:: 等待 RabbitMQ 服務啟動完成
echo "等待 RabbitMQ 服務啟動..."
timeout /t 30 /nobreak

:: 建立 RabbitMQ Cluster
echo "Starting to build rabbitmq cluster."

:: 在 rabbitmq2 容器内執行脚本
docker cp rabbitmq_cluster_setup.sh rabbitmq2:/rabbitmq_cluster_setup.sh
docker exec rabbitmq2 /bin/bash -c '/rabbitmq_cluster_setup.sh'

:: 在 rabbitmq3 容器内執行脚本
docker cp rabbitmq_cluster_setup.sh rabbitmq3:/rabbitmq_cluster_setup.sh
docker exec rabbitmq3 /bin/bash -c '/rabbitmq_cluster_setup.sh'

echo "完成 RabbitMQ Cluster"

:: check cluster status
echo "Check cluster status:"
docker exec rabbitmq1 /bin/bash -c "rabbitmqctl cluster_status"
docker exec rabbitmq2 /bin/bash -c "rabbitmqctl cluster_status"
docker exec rabbitmq3 /bin/bash -c "rabbitmqctl cluster_status"

:: 等待使用者按下任意鍵以關閉視窗
pause

在 Windows 環境下使用 CMD (命令提示字元)  去執行 init_rabbitmq_cluster.bat  這個檔案,就可以建立好 RabbitMQ Cluster,管理者的登入帳號和密碼都在上面。

 

Program.cs 設定 Wolverine

在完成 RabbitMQ Cluster 的架設後,先建立名稱為「wolverine」的 Virtual Host,並且增加使用者帳號「wolverine_user」和密碼,然後在 Program.cs 裡的 builder.Host.UseWolverine() 裡去增加 RabbitMQ 的 Connection 以及 PublishMessage 和 ListenToRabbitQueue 的設定

// 使用 Wolverine 註冊訊息處理
builder.Host.UseWolverine(opts =>
{
    // 設定 RabbitMQ 連線資訊,包含 VirtualHost
    opts.UseRabbitMq(connectionFactory =>
        {
            connectionFactory.VirtualHost = "wolverine";
            connectionFactory.UserName = "wolverine_user";
            connectionFactory.Password = "1q2w3e4r5t_";

            connectionFactory.EndpointResolverFactory = _ => new DefaultEndpointResolver([
                new AmqpTcpEndpoint("127.0.0.1", 5672),
                new AmqpTcpEndpoint("127.0.0.1", 5673),
                new AmqpTcpEndpoint("127.0.0.1", 5674)
            ]);
        })
        .AutoProvision();
    
    // 發送 MessageCreatedEvent 到指定的 exchange 和 queue
    opts.PublishMessage<MessageCreatedEvent>()
        .ToRabbitExchange(
            exchangeName: "message_created_exchange",
            configure: exchange =>
            {
                exchange.ExchangeType = ExchangeType.Direct;
                exchange.BindQueue(queueName: "message_created_queue", bindingKey: "message.created");
            });

    // 接收來自 RabbitMQ 的 MessageCreatedEvent
    opts.ListenToRabbitQueue(queueName: "message_created_queue", configure: queue =>
    {
        queue.TimeToLive(1024.Minutes());
    });
});

RabbitMQ 連線設定:

  • 使用 UseRabbitMq 配置 ConnectionFactory,設置了 VirtualHost、UserName 和 Password。
  • 使用 EndpointResolverFactory 來配置多個 RabbitMQ 節點,使得在集群中如果有節點不可用,仍然能夠自動切換到其他節點。

MessageCreatedEvent 的發送:

  • 設定了 PublishMessage<MessageCreatedEvent>(),將事件發送到一個名為 "message_created_exchange" 的 exchange,並配置它為 Direct 類型。
  • 使用 exchange.BindQueue 綁定了 "message_created_queue" 並設置了 bindingKey 為 "message.created",這樣確保了該事件能夠正確路由到對應的 queue。

接收 MessageCreatedEvent:

  • 使用 ListenToRabbitQueue 設定了監聽 "message_created_queue"。
  • 使用 TimeToLive(5.Minutes()) 設置消息的存活時間,讓未處理的消息在 queue 中的壽命為 1024 分鐘。

 

發送者 - MessageController

基本上跟前一篇的內容是一樣的,沒有做任何的變化

using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;
using Wolverine;

namespace WebApplication1.Controllers;

/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
    private readonly ILogger<MessageController> _logger;

    private readonly TimeProvider _timeProvider;

    private readonly IMessageBus _messageBus;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageController"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    /// <param name="messageBus">The message bus</param>
    public MessageController(ILogger<MessageController> logger, TimeProvider timeProvider, IMessageBus messageBus)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._messageBus = messageBus;
    }

    /// <summary>
    /// 建立 Message 資料
    /// </summary>
    /// <param name="parameter">parameter</param>
    /// <returns></returns>
    [HttpPost("create")]
    [Produces("application/json", "text/json")]
    [ProducesResponseType(200, Type = typeof(ResponseMessageOutputModel))]
    [ProducesResponseType(500, Type = typeof(ResponseMessageOutputModel))]
    public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
    {
        this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
                                    $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
                                    parameter.Content);

        var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);

        try
        {
            // 使用 Wolverine 發送事件
            await this._messageBus.PublishAsync(messageEvent);
            return this.Ok(new ResponseMessageOutputModel { Message = "Event produced successfully" });
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "Failed to publish message event.");
            return this.StatusCode(500, new ResponseMessageOutputModel { Message = "Failed to produce event" });
        }
    }
}

 

消費者 - MessageCreatedEventHandler

這也是和前一篇的內容是一樣的,沒有做任何的改變

using System.Text.Json;
using WebApplication1.Infrastructure.Events;

namespace WebApplication1.Infrastructure.EventHandlers;

/// <summary>
/// class MessageCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 MessageCreatedEvent 事件
/// </remarks>
public class MessageCreatedEventHandler
{
    private readonly ILogger<MessageCreatedEventHandler> _logger;

    private readonly TimeProvider _timeProvider;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageCreatedEventHandler"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    public MessageCreatedEventHandler(ILogger<MessageCreatedEventHandler> logger, TimeProvider timeProvider)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
    }

    /// <summary>
    /// 接收並處理 MessageCreatedEvent 事件
    /// </summary>
    /// <param name="messageCreatedEvent">The messageCreatedEvent</param>
    public async Task HandleAsync(MessageCreatedEvent messageCreatedEvent)
    {
        try
        {
            var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
                                        "Got a MessageCreatedEvent");

            this._logger.LogInformation("{Message} - {MessageCreatedEvent}",
                                        message,
                                        JsonSerializer.Serialize(messageCreatedEvent));

            // 可以在此處進行其他的異步操作

            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "An error occurred while processing the event.");
        }
    }
}

 

啟動服務並觀察

服務啟動後,在 console log 裡就可以看到相關的 RabbitMQ 設定處理的 log 紀錄

可以到 RabbitMQ Mamagement 的 Connection 裡看到有 Client 端的連線,因為我們的服務有接收端要收 message,所以就會有連線

然後透過 Swagger 或 Scalar  或使用 Postman 去執行 /message/create.POST,觀察 conolse log 就可以看到有發送並且 MessageCreatedEventHandler 有接收並處理的 log 紀錄

也可以觀察 RabbitMQ 的 Queues and Streams 內容,確定是有將 message 發送到 RabbitMQ 裡並且是有被 Consumer 接收處理的

 

上面就是改用 WolverineFx.RabbitMQ 將 MessageCreatedEvent 發送到 RabbitMQ,並且讓 MessageCreatedEventHander 從 RabbitMQ 接收 MessageCreatedEvent 並做處理。

以上