使用 Wolverine 實作生產者-消費者模式(Producer-Consumer Pattern)

  • 63
  • 0
  • C#
  • 2024-10-21

之前寫了一篇「使用 Channels 與 BackgroundService 實作生產者-消費者模式(Producer-Consumer Pattern)」是使用 Channels 與 BackgroundService 來實作,而這一次就來改用 Wolverine 這個套件來試試看。

有關 Wolverine 這個套件我也看了好久,一直想拿來試試看,會想要用這個套件,主要是它是一個輕量化的工具,適合用於整合訊息佇列、事件驅動設計和後台任務處理。

而且也因為適合用於整合訊息佇列、事件驅動設計,所以支援了許多第三方服務,例如:RabbitMQ, Kafaka, MQTT, AzureServiceBux, AmazonSqs 等,所以打算之後也繼續玩玩 Wolverine。

Wolverine

以下是 ChatGPT 所列出的優缺點和總結

Wolverine 是一個專為 .NET 生態系統設計的訊息處理與微服務架構,旨在簡化和強化應用程式內的訊息傳遞和後端處理流程。它是一個輕量的工具,適合用於整合訊息佇列、事件驅動設計和後台任務處理。以下是 Wolverine 的主要用途與優缺點。

用途

  • 訊息佇列與事件驅動:Wolverine 提供強大的訊息佇列功能,可以與 RabbitMQ、Azure Service Bus 等訊息中介系統整合,並且支援基於記憶體的訊息佇列,方便在開發環境進行測試。
  • 後台任務與工作者處理:透過 Wolverine,開發者可以輕鬆地定義背景任務、排程任務,並處理異步作業(例如檔案處理、通知推送)。
  • 內建的事件總線:Wolverine 具有內建的事件總線功能,適合用於事件驅動設計模式,開發者可以使用事件發布-訂閱模式來處理事件和命令。
  • 內建的中介軟體模式支援:它也可以用於實作 Command-Query Responsibility Segregation(CQRS)模式,讓命令與查詢的處理流程更簡單。

優點

  • 簡化訊息處理邏輯:Wolverine 提供一致且簡單的 API 介面,讓開發者不需撰寫過多樣板程式碼,即可輕鬆實作複雜的訊息處理邏輯。
  • 效能優化:Wolverine 強調高效能,對訊息處理進行了許多優化,如使用並行處理訊息的設計,能夠在高負載環境下保持較低延遲。
  • 內建支援多種訊息中介:它支援多種訊息中介軟體(例如 RabbitMQ、Azure Service Bus),開發者可以根據需求選擇最適合的佇列技術,並且能夠輕鬆切換。
  • 開發者體驗友好:擁有清晰的文件和範例(如 Wolverine 官方網站和教學資源),學習曲線較低,並且能與 ASP.NET Core 無縫整合。

缺點

  • 生態系統相對較新:相比於 MassTransit、NServiceBus 等傳統訊息總線方案,Wolverine 的生態系統和社群相對較新,第三方資源和支援可能較少。
  • 文件尚在發展中:儘管 Wolverine 擁有不錯的文件支援,但隨著功能的發展,部分進階應用的範例與指引可能不如一些成熟的訊息中介方案。
  • 缺少 GUI 管理工具:不像 RabbitMQ 等訊息中介本身有豐富的管理介面,Wolverine 更偏向程式邏輯上的處理,開發者需要依賴其他工具來監控佇列和訊息。

適用情境

  • 若你需要一個高效且簡單的訊息佇列解決方案,且不想使用過於重量級的框架,Wolverine 是一個很好的選擇。
  • 適合希望在 .NET Core 環境中實作 CQRS 或者事件驅動架構的團隊。
  • 如果你習慣於 RabbitMQ 並需要在新的專案中採用更現代的訊息處理方式,Wolverine 可以是值得探索的替代方案。


總結來說,Wolverine 是一個現代化、輕量且專注於效能的訊息處理框架,適合希望簡化訊息處理和後台任務的開發者使用。然而,由於其生態系統相對較新,選擇時應考量現有團隊的經驗與專案需求。

 

生產者實作

首先在專案裡安裝 WolverinFx

接著修改 MessageController.cs,這邊就不需要再使用 Channels 了,而是直接使用 Wolverin 所提供的 IMessageBus

using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;
using Wolverine;

namespace WebApplication1.Controllers;

/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
    private readonly ILogger<MessageController> _logger;

    private readonly TimeProvider _timeProvider;
    
    private readonly IMessageBus _messageBus;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageController"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    /// <param name="messageBus">The message bus</param>
    public MessageController(ILogger<MessageController> logger, TimeProvider timeProvider, IMessageBus messageBus)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._messageBus = messageBus;
    }
    
    /// <summary>
    /// 建立 Message 資料
    /// </summary>
    /// <param name="parameter">parameter</param>
    /// <returns></returns>
    [HttpPost("create")]
    public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
    {
        this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
                                    $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
                                    parameter.Content);

        var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);

        try
        {
            // 使用 Wolverine 發送事件
            await this._messageBus.PublishAsync(messageEvent);
            return this.Ok(new { Message = "Event produced successfully" });
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "Failed to publish message event.");
            return this.StatusCode(500, new { Message = "Failed to produce event" });
        }
    }    
}

MessageCreatedEvent.cs

namespace WebApplication1.Infrastructure.Events;

/// <summary>
/// record MessageCreatedEvent
/// </summary>
public record MessageCreatedEvent
{
    /// <summary>
    /// Initializes a new instance of the <see cref="MessageCreatedEvent"/> class
    /// </summary>
    /// <param name="id">The id</param>
    /// <param name="content">The content</param>
    public MessageCreatedEvent(Guid id, string content)
    {
        this.Id = id;
        this.Content = content;
    }

    /// <summary>
    /// Id
    /// </summary>
    public Guid Id { get; set; }

    /// <summary>
    /// Content
    /// </summary>
    public string Content { get; set; }
}

這裡使用 Wolverine 的 IMesssageBus 取代 ChannelWriter,並透過 PublishAsync 方法將 MessageCreatedEvent  傳送到 MessageBus 裡。

 

消費者實作

這邊就不需要使用 BackgroundService + ChannelReader 來接收 MessageCreatedEvent 事件,因為 Wolverine 會去處理訂閱和訊息的處理邏輯,所以開發者只需要去定義事件處理的 Handler 就可以。

以下就是 MessageCreatedEventHandler 類別的實作

using System.Text.Json;
using WebApplication1.Infrastructure.Events;

namespace WebApplication1.Infrastructure.EventHandlers;

/// <summary>
/// class MessageCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 MessageCreatedEvent 事件
/// </remarks>
public class MessageCreatedEventHandler
{
    private readonly ILogger<MessageCreatedEventHandler> _logger;

    private readonly TimeProvider _timeProvider;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageCreatedEventHandler"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    public MessageCreatedEventHandler(ILogger<MessageCreatedEventHandler> logger, TimeProvider timeProvider)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
    }

    /// <summary>
    /// 處理 MessageCreatedEvent 的方法
    /// </summary>
    /// <param name="messageCreatedEvent">The messageCreatedEvent</param>
    public async Task HandleAsync(MessageCreatedEvent messageCreatedEvent)
    {
        try
        {
            var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
                                        "Got a MessageCreatedEvent");

            this._logger.LogInformation("{Message} - {MessageCreatedEvent}",
                                        message,
                                        JsonSerializer.Serialize(messageCreatedEvent));

            // 可以在此處進行其他的異步操作

            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "An error occurred while processing the event.");
        }
    }
}

Wolverine 會自動找到並執行 HandleAsync 方法,前提是要符合事件的參數簽名,即 Task HandleAsync(MessageCreatedEvent)。你可以改用其他名稱,只要方法參數匹配到,Wolverine 就能識別這是對應的事件處理 Handler 類別。

除了生產者與消費者的實作外,不要忘記還要到 Program.cs 裡去註冊使用 Wolverine,透過註冊之後,Wolverine 會去掃描或註冊事件處理 Handler。

上面的註冊設定會去掃描 Assembly 裡有使用到 IMessageBus 的事件類別以及接收處理事件類別的處理器,或者也可以直接指定掃描 Assembly

對了,有關事件類別處理器 (Handler) 裡接收事件類別的方法名稱,是有限制的,可以使用以下的名稱

有關 Message Handler Discovery  的內容,可以查看 Wolverine 的文件

Message Handler Discovery | Wolverine

 

執行並觀察

以下是服務剛啟動時的 conolse log 內容,這時候還沒有執行 controller  所以還沒有發送任何訊息

然後執行 api/message/create.POST 方法,一共執行了三次,以下是發送與接收的 log 紀錄

第一次執行時的發送到接收處理,這過程似乎長了點,不過後續的發送、接收處理的間隔時間都相當短

 

同場加映:使用 BackgroundService  做為生產者,並使用 PeroidicTimer 計時發送事件

每 60  秒鐘建立 IssueCreatedEvent 事件,並透過 Wolverine 的 MessageBus 發送。這邊的重點在於取得 IMessageBus,因為是 BackgroundService,所以要在類別建構式依賴注入 IServiceScopeFactory , 然後在 HandleAsync 方法裡使用 serviceScopeFactory 建立 scope 並取得 seviceProvider,然後再使用 serviceProvider 取得 IMessageBus

using WebApplication1.Infrastructure.Events;
using Wolverine;

namespace WebApplication1.Infrastructure.BackgroundServices;

/// <summary>
/// class IssueCreateBackgroundService
/// </summary>
/// <remarks>
/// 建立 IssueCreatedEvent 事件,並透過 Wolverine 的 MessageBus 發送
/// </remarks>
public class IssueCreateBackgroundService : BackgroundService
{
    /// <summary>
    /// 執行間隔時間 60 sec
    /// </summary>
    private static readonly TimeSpan IntervalTime = TimeSpan.FromSeconds(60);

    private readonly ILogger<IssueCreateBackgroundService> _logger;

    private readonly TimeProvider _timeProvider;

    private readonly IServiceScopeFactory _serviceScopeFactory;

    /// <summary>
    /// Initializes a new instance of the <see cref="IssueCreateBackgroundService"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    /// <param name="serviceScopeFactory">The service scope factory</param>
    public IssueCreateBackgroundService(ILogger<IssueCreateBackgroundService> logger,
                                        TimeProvider timeProvider,
                                        IServiceScopeFactory serviceScopeFactory)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._serviceScopeFactory = serviceScopeFactory;
    }

    /// <summary>
    /// Executes the stopping token
    /// </summary>
    /// <param name="stoppingToken">The stopping token</param>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var scope = this._serviceScopeFactory.CreateScope();

        var serviceProvider = scope.ServiceProvider;

        var messageBus = serviceProvider.GetRequiredService<IMessageBus>();

        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is starting.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);

        // 使用 PeriodicTimer
        using PeriodicTimer periodicTimer = new(IntervalTime);

        while (await periodicTimer.WaitForNextTickAsync(stoppingToken) && stoppingToken.IsCancellationRequested is false)
        {
            await messageBus.SendAsync(new IssueCreatedEvent
            {
                OriginatorId = Guid.NewGuid(),
                Title = $"Issue Created at {this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
                Description = $"Issue Created at {this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}"
            });

            this._logger.LogInformation(
                "{DateTimeNow} {TypeName} Processing.",
                $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
                this.GetType().Name);
        }
    }

    /// <summary>
    /// Stops the cancellation token
    /// </summary>
    /// <param name="cancellationToken">The cancellation token</param>
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is stopping.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);
        await base.StopAsync(cancellationToken);
    }
}

建立 IssueCreatedEventHandler 類別,透過 Wolverine 的 MessageBus 接收處理 IssueCreatedEvent 事件

using System.Text.Json;
using WebApplication1.Infrastructure.Events;

namespace WebApplication1.Infrastructure.EventHandlers;

/// <summary>
/// class IssueCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 IssueCreatedEvent 事件
/// </remarks>
public class IssueCreatedEventHandler
{
    private readonly ILogger<IssueCreatedEventHandler> _logger;

    private readonly TimeProvider _timeProvider;

    /// <summary>
    /// Initializes a new instance of the <see cref="IssueCreatedEventHandler"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    public IssueCreatedEventHandler(ILogger<IssueCreatedEventHandler> logger,
                                    TimeProvider timeProvider)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
    }

    /// <summary>
    /// 接收並處理 IssueCreatedEvent 事件
    /// </summary>
    /// <param name="issueCreatedEvent">The issueCreatedEvent</param>
    public async Task HandleAsync(IssueCreatedEvent issueCreatedEvent)
    {
        try
        {
            var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
                                        "Got a IssueCreatedEvent");

            this._logger.LogInformation("{Message} - {IssueCreatedEvent}",
                                        message,
                                        JsonSerializer.Serialize(issueCreatedEvent));

            // 可以在此處進行其他的異步操作

            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "An error occurred while processing the event.");
        }
    }
}

最後別忘了要註冊 BackgroundService

執行並觀察

 

以上就是分別在兩種應用情境裡使用 Wolverine 實作生產者-消費者模式(Producer-Consumer Pattern)的簡單練習。