[Redis] 在 Asp.Net 使用 StackExchange.Redis 操作 Redis

簡單筆記在如何使用 StackExchange.Redis 操作Redis

基本使用

為了方便,直接使用docker執行一個redis

$docker run --name myRedis -d -p 6379:6379 redis

接著專案加入nuget package

dotnet add package StackExchange.Redis

ConnectionMultiplexer

接著透過 ConnectionMultiplexer.ConnectAsync 取得與redis溝通的物件,依照官方的建議,ConnectionMultiplexer 的所有操作都是Tread-Safe,可以保存並重用它,依照 ConnectAsync 的多載,有幾種取得方式:

單一主機的連線字串

ConnectionMultiplexer redis = await ConnectionMultiplexer.ConnectAsync("localhost,serviceName=Master");

多台主機的連線字串

若連結的redis是主從架構,則可以傳入兩個redis的連線位置,並且用","分隔

ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server1:6379,server2:6379");

ConfigurationOptions

可以建立 ConfigurationOptions,設定相關參數,傳入 ConnectAsync 來取得 ConnectionMultiplexer

var options = new ConfigurationOptions()
{
    EndPoints =
    {
        "localhost"
    }
    };
    var redis = await ConnectionMultiplexer.ConnectAsync(options);
}

從 String Parse為 ConfigurationOptions 加工處理

若連線設定是以string的保存的,例如從appsetting取得連線字串,取得後可以透過 ConfigurationOptions.Parse() 把 string 轉回 ConfigurationOptions,做細部的設定後再取得連線物件

string configString = GetRedisConfiguration();
var options = ConfigurationOptions.Parse(configString);
options.AllowAdmin = true;
var conn = ConnectionMultiplexer.Connect(options);

IDatabase

接著使用 GetDatabase() 來訪問對應的redis(若存在多個redis,可以透過第一個參數指定要訪問的redis)

IDatabase database = redis.GetDatabase();

透過 IDatabase 實例,就可以簡單地對 redis 進行操作

var key = "test-key";
var value = "test"

// set key-value
var result = await database.StringSetAsync(key, value);

// get value
var value = await database.StringGetAsync(key);

連線重試

若Redis連線失敗時,StackExchange.Redis 會自動嘗試重新連接,每兩次重試的間隔時間、策略可由 ConfigurationOptions 的 ReconnectRetryPolicy 參數決定,有線性及指數兩種方式可以設定,預設為 LinearRetry(5000)

var options = new ConfigurationOptions()
{
    EndPoints = { "localhost" },
    ReconnectRetryPolicy = new ExponentialRetry(5000),
};
            
var redis = await ConnectionMultiplexer.ConnectAsync(options);

使用Redis實現簡單的搶票機制

瞭解了基本使用方式,接著做個小功能試試看

情境

活動限制 100 個名額,有多個使用者會經過不同的server搶票,不允許超賣的情形

流程

活動開始前,在Redis塞了一個這次活動的數量(100),

搶票過程中,預期在每經過一個client後就遞減數量並且返回成功,當數量到達0時,後續的client都會拿到失敗

實作

Init

在開始之前先初始化Redis的資料,塞入活動用的允許數量

private static readonly RedisClient _client = new RedisClient();

private static async Task Init()
{
    await _client.SetString(_eventCountKey, 100);
    await _client.GetString(_eventCountKey);
}

Client Request

模擬有105個client request,每個client一個task讓同時多個人在搶票

private static readonly TicketService _service = new TicketService(_client);
private static readonly string _eventCountKey = "Event_Count";

private static async Task Run()
{
    var result = new ConcurrentStack<bool>();
    var tasks = new List<Task>();
    
    // 發出105個task
    for (var index = 0; index < 105; index++)
    {    
        var number = index;
        tasks.Add(Task.Run(async () =>
        {
            // 透過 TicketService 處理搶票的邏輯,返回bool
            result.Push(await _service.GetTicket(_eventCountKey));
            Console.WriteLine($"{number}");
        }));
    }
    await Task.WhenAll(tasks);

    // 驗證拿到成功的client request數量
    Console.WriteLine($"success count: {result.Count(r => r == true)}");
}

TicketService

public class TicketService
{
    private readonly RedisClient _client;

    public TicketService(RedisClient redisClient)
    {
        _client = redisClient;
    }

    public async Task<bool> GetTicket(string key)
    {
        // 只有在數量還有剩 且 透過Redis的Lock成功,才繼續搶票的動作
        // 這邊Lock的Timeout時間為100毫秒,純粹只是為了測試
        if (await TicketCount(key) > 0 && await _client.Lock(key, TimeSpan.FromMilliseconds(100)))
        {
            try
            {
                // 遞減數量,會返回剩餘的數量,剩餘數量小於0代表超賣了,會返回失敗
                var lastCount = await _client.StringDecrement(key);

                return lastCount >= 0;
            }
            finally
            {
                // 完成後要把Lock釋放
                await _client.LockRelease(key);
            }
        }

        return false;
    }

    private async Task<int> TicketCount(string key)
    {
        return (int)await _client.GetString(key);
    }
}

RedisClient

RedisClient 負責跟Redis的溝通,包含Get、Set等的所有操作

public class RedisClient
{
    // 可重用,所以在ctor建立後就放到filed上
    private readonly ConnectionMultiplexer _connection;

    public RedisClient()
    {
        var options = new ConfigurationOptions()
        {
            EndPoints = {"localhost"}
        };
        _connection = ConnectionMultiplexer.Connect(options);
    }

    public async Task<bool> Lock(string key, TimeSpan expiry)
    {
        // Lock失敗就等200毫秒,再重試,最多10次
        var lockKey = $"Lock_{key}";
        var number = 0;
        do
        {
            try
            {
                var database = _connection.GetDatabase();
                if (await database.LockTakeAsync(lockKey, Environment.MachineName, expiry))
                {
                    return true;
                }
            }
            catch (Exception)
            {
                await Task.Delay(200);
                number++;
            }
        } while (number < 10);

        return false;
    }

    public async Task SetString(string key, RedisValue value)
    {
        var database = _connection.GetDatabase();
        await database.StringSetAsync(key, value);
    }

    public async Task<RedisValue> GetString(string key)
    {
        var database = _connection.GetDatabase();

        return await database.StringGetAsync(key);
    }

    public async Task<bool> LockRelease(string key)
    {
        var lockKey = $"Lock_{key}";
        var database = _connection.GetDatabase();

        return await database.LockReleaseAsync(lockKey, Environment.MachineName);
    }

    public async Task<long> StringDecrement(string key)
    {
        var database = _connection.GetDatabase();

        return await database.StringDecrementAsync(key);
    }
}

執行

上面實作完成後,接著執行看看,結果看起來跟預期一樣

但是這個做法的缺點會在每個人都需要等待期間搶票的流程結束,才能放下一個人進來,若中間的邏輯複雜、執行時間長,就容易造成Client不斷失敗或是需要不停的等待。

接著調整一下作法,調整為使用排隊的方式,每個搶票成功的Client僅僅是排入隊伍而已,再由其他的背景服務處理各個Client的購票邏輯,減緩上面的做法會造成的問題。

使用 Redis 的 Publish/Subscribe 優化搶票機制

比照前面的作法,增加了但對於搶票成功的Client,僅將他 Publish 到專屬的Chnnel,而另起一個背景服務逐步消化Chnnel的訊息

背景服務

private static async Task Subscribe()
{
    await _service.Subscribe();
}

TicketService

public async Task<bool> GetTicket(int client)
{
    ...
    
    try
    {
        var lastCount = await _client.StringDecrement(_key);
        if (lastCount < 0)
        {
            return false;
        }
        // 前後都相同,僅差在這邊多了一個 Publish 的動作,而上面僅檢查剩餘數量
        await _client.Publish(_channelKey, client.ToString());

        return true;
    }
    
    ...
}

// Subscribe 會逐筆處理每一個Chnnel的訊息,在這邊就可以放心處理商業邏輯,而不會讓Client因為執行過久造成的等待
public async Task Subscribe()
{
    await _client.Subscribe(_channelKey, value =>
    {
        _users.Add(value);
        SendEmail("恭喜搶到!");
        Console.WriteLine($"user count: {_users.Count}");
    });
}

執行


StackExchange.Redis Docs

SampleCode

Redis聽過了很久,也自以為瞭解了,直到面試時才被狠狠地洗了臉。透過實作簡單的功能,加強對Redis的瞭解。