使用 Channels 與 BackgroundService 實作生產者-消費者模式(Producer-Consumer Pattern)

  • 572
  • 0
  • C#
  • 2024-10-16

過去的工作專案為了要加快 Request 的回應速度,所以就想方設法地去做了很多的調整…

例如一個訊息建立後要做很多的事情,像是要許多相關資料 Table 的資料異動、透過 RabbitMQ 去發送 message 通知其他 Client、快取資料的更新等等等

這麼多的處理如果都是在一個 Request 裡全部處理完成後才回傳 Response 結果,因為每個處理都會花費一些時間,全部累積起來就相當可觀,於是就會想到是不是要往平行處理或多執行緒的方式來解決,但是在繁忙的網站服務去使用這些解決方案又有很多風險,程式沒有寫好的話就會出現嚴重錯誤。

所以就想到用發送事件的方式,當訊息建立完成後,就發送一個事件到一個佇列裡,然後有一個 BackgroundService 去專門接收訊息建立完成事件,當收到事件後就開始一連串的處理,這麼一來 Action 方法的回應時間就能夠加快一些。

生產者-消費者模式(Producer-Consumer Pattern)

  • 定義:這是一種通過緩衝區(buffer)在生產者(Producer)和消費者(Consumer)之間解耦的設計模式。生產者生成資料並將其放入緩衝區,消費者從緩衝區中取出資料進行處理。生產者和消費者之間並不直接交互,它們各自只關注自己的工作。
  • 使用情境:當生產資料和消費資料的速度不一致時,此模式可以讓生產者和消費者彼此不互相阻塞,通過緩衝區平衡兩者的速度差異。
  • 實現方式:C# 中可以使用 BlockingCollection<T> 或 Channel<T> 來實現這種模式。

生產者-消費者模式適合用於內部任務排程和工作佇列。

 

Channels (通道)

相關連結

 

實作案例

ChannelServiceCollectionExtensions

將 Channel、Writer、Reader 都單獨註冊成服務,並且設置了 SingleReader = true,這樣可以優化讀取性能,適合只有一個消費者進行資料處理的場景。

在設置裡使用了Channel.CreateUnbounded<T> 表示沒有固定的緩衝大小,可以持續寫入資料而不會被阻塞(除非記憶體不足)。

使用情境:適合於對生產速度和消費速度差異較大的場景,因為它不會因為緩衝區滿了而阻塞寫入者。但這也表示生產者生成的速度如果遠高於消費者的處理速度,可能會導致記憶體使用過多。

using System.Threading.Channels;
using WebApplication1.Infrastructure.Events;

namespace WebApplication1.Infrastructure.ServiceCollectionExtensions;

/// <summary>
/// class ChannelServiceCollectionExtensions
/// </summary>
public static class ChannelServiceCollectionExtensions
{
    /// <summary>
    /// Add the Channels
    /// </summary>
    /// <param name="services">The services</param>
    /// <returns></returns>
    public static IServiceCollection AddChannels(this IServiceCollection services)
    {
        services.AddSingleton(Channel.CreateUnbounded<MessageCreatedEvent>(new UnboundedChannelOptions { SingleReader = true }));
        services.AddSingleton(x => x.GetRequiredService<Channel<MessageCreatedEvent>>().Writer);
        services.AddSingleton(x => x.GetRequiredService<Channel<MessageCreatedEvent>>().Reader);

        return services;
    }
}

UnboundedChannelOptions

因為使用到 UnboundedChannelOptions 這個類別,所以特別說明。

UnboundedChannelOptions 是用來設定 Channel.CreateUnbounded<T> 方法時的選項。這些選項會影響 Channel 的行為,包括如何處理資料的寫入和讀取。有幾個重要的屬性可以設定:

SingleWriter(bool)

  • 用途:指示是否保證只有一個寫入者(Producer)。
  • 預設值:false(允許多個寫入者)
  • 說明:如果設置為 true,則 Channel 會假設只有一個生產者進行寫入操作,這樣可以進行一些內部優化,以提高效能。當你確定只有一個寫入者時,可以將其設為 true。

SingleReader(bool)

  • 用途:指示是否保證只有一個讀取者(Consumer)。
  • 預設值:false(允許多個讀取者)
  • 說明:如果設置為 true,則 Channel 會假設只有一個消費者進行讀取操作。這可以在內部優化某些併發控制,以提升讀取效率。如果你確定只有一個後台服務處理這些訊息,則應將其設為 true。

AllowSynchronousContinuations(bool)

  • 用途:允許同步執行的延續操作(continuations)。
  • 預設值:false
  • 說明:如果設為 true,當某些操作(如 WriteAsync 或 ReadAsync)完成後,後續的回調操作可能會在同一個執行緒中執行。這可以在某些情況下降低上下文切換的成本,但也可能導致執行緒被佔用過久。因此,僅在有需要時才設置為 true。

 

MessageController (生產者)

在 MessageController 注入 ChannelWriter 以寫入事件, 然後 CreateAsync 方法負責接收 API 請求並將 MessageCreatedEvent 寫入 Channel。

使用 TryWrite 確保當 Channel 滿了或不可寫入時能夠快速回應 503 錯誤,避免阻塞 API 的請求,並對 Channel 寫入失敗的情況進行處理。

TryWrite 成功時直接返回 OK,失敗時返回 503 狀態碼。

using System.Threading.Channels;
using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;

namespace WebApplication1.Controllers;

/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
    private readonly ILogger<MessageController> _logger;

    private readonly TimeProvider _timeProvider;

    private readonly ChannelWriter<MessageCreatedEvent> _messageCreatedEventChannelWriter;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageController"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The timeProvider</param>
    /// <param name="messageCreatedEventChannelWriter">The messageCreatedEventChannelWriter</param>
    public MessageController(ILogger<MessageController> logger,
                             TimeProvider timeProvider,
                             ChannelWriter<MessageCreatedEvent> messageCreatedEventChannelWriter)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._messageCreatedEventChannelWriter = messageCreatedEventChannelWriter;
    }

    /// <summary>
    /// 建立 Message 資料
    /// </summary>
    /// <param name="parameter">parameter</param>
    /// <returns></returns>
    [HttpPost("create")]
    public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
    {
        this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
                                    $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
                                    parameter.Content);

        var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);

        try
        {
            // 先嘗試使用 TryWrite 將事件寫入 Channel
            if (this._messageCreatedEventChannelWriter.TryWrite(messageEvent))
            {
                // 如果 TryWrite 成功,直接返回 OK,無需再次執行 WriteAsync
                return this.Ok(new { Message = "Event produced successfully" });
            }

            // 如果 TryWrite 失敗,記錄警告並返回 503 狀態碼
            this._logger.LogWarning("Channel is full or completed, unable to write message event.");
            return this.StatusCode(503, new { Message = "Service Unavailable. Try again later." });
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "Failed to write message event to the channel.");
            return this.StatusCode(500, new { Message = "Failed to produce event" });
        }
    }
}

MessageCreatedEvent 類別

namespace WebApplication1.Infrastructure.Events;

/// <summary>
/// class MessageCreatedEvent
/// </summary>
public class MessageCreatedEvent
{
    /// <summary>
    /// Initializes a new instance of the <see cref="MessageCreatedEvent"/> class
    /// </summary>
    /// <param name="id">The id</param>
    /// <param name="content">The content</param>
    public MessageCreatedEvent(Guid id, string content)
    {
        this.Id = id;
        this.Content = content;
    }
    
    /// <summary>
    /// Id
    /// </summary>
    public Guid Id { get; set; }

    /// <summary>
    /// Content
    /// </summary>
    public string Content { get; set; }
}

 

MessageCreatedEventConsumerBackgroundService (消費者)

這個 BackgroundService 類別負責持續監聽 Channel,使用 ChannelReader 讀取生產者寫入的事件。 透過 ReadAllAsync 方法逐一處理 MessageCreatedEvent 事件,實現非同步的消息處理,當事件到達時即時進行處理並記錄日誌,確保了消息的消費過程不會中斷。

using System.Text.Json;
using System.Threading.Channels;
using WebApplication1.Infrastructure.Events;

namespace WebApplication1.Infrastructure.BackgroundServices;

/// <summary>
/// class MessageCreatedEventConsumerBackgroundService
/// </summary>
public class MessageCreatedEventConsumerBackgroundService : BackgroundService
{
    private readonly ILogger<MessageCreatedEventConsumerBackgroundService> _logger;
    
    private readonly TimeProvider _timeProvider;
    
    private readonly ChannelReader<MessageCreatedEvent> _channelReader;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageCreatedEventConsumerBackgroundService"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The time provider</param>
    /// <param name="channelReader">The channel reader</param>
    public MessageCreatedEventConsumerBackgroundService(ILogger<MessageCreatedEventConsumerBackgroundService> logger,
                                                        TimeProvider timeProvider,
                                                        ChannelReader<MessageCreatedEvent> channelReader)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._channelReader = channelReader;
    }

    /// <summary>
    /// Executes the stopping token
    /// </summary>
    /// <param name="stoppingToken">The stopping token</param>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is starting.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);

        // 從 ChannelReader 讀取事件
        await foreach (var messageCreatedEvent in this._channelReader.ReadAllAsync(stoppingToken))
        {
            try
            {
                // 處理讀取到的事件
                var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
                                            "Got a MessageCreatedEvent");
            
                this._logger.LogInformation("{Message} - {MessageCreatedEvent}", 
                                            message, 
                                            JsonSerializer.Serialize(messageCreatedEvent));
            }
            catch (Exception ex)
            {
                this._logger.LogError(ex, "An error occurred while processing the event.");
            }
        }
    }

    /// <summary>
    /// Stops the cancellation token
    /// </summary>
    /// <param name="cancellationToken">The cancellation token</param>
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is stopping.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);
        
        await base.StopAsync(cancellationToken);
    }
}

最後要記得在 Program.cs 裡去註冊服務

 

觀察執行狀況

執行服務並觀察 Console Log  的內容

 

最後

在這個應用中,我們利用 C# 中的 Channels  在 ASP.NET Core Web API 裡實作了一個生產者-消費者模式。

P.S.  以下是使用 ChatGPT  幫我做個總結

這個基於 Channels 的生產者-消費者模式實現,利用了 ASP.NET Core 的依賴注入、非同步操作,以及 Channel 提供的高效線程安全通信能力:

  • 高效通信:Channel 提供了高效且線程安全的生產者-消費者模式,可以在多線程環境下輕鬆傳遞消息而不需要顧及複雜的鎖定機制。
  • 擴展性強:將 Channel 的註冊與使用封裝到擴展方法和 BackgroundService 中,使得此模式可以方便地複用和擴展到其他類型的事件處理。
  • 簡化異步處理:使用 Channel 和 BackgroundService 的組合,使得消息處理的邏輯簡單且易於理解,並且能夠高效地處理大量請求。

這樣的設計不僅適合需要高效處理事件的場景,如佇列處理、數據流等,也能輕鬆適應多種不同的應用需求。對於希望在 ASP.NET Core 中構建高效、可擴展的應用程序的開發者而言,Channel 提供了一個非常有力的工具。

 

純粹是在寫興趣的,用寫程式、寫文章來抒解工作壓力