簡單筆記在如何使用 StackExchange.Redis 操作Redis
基本使用
為了方便,直接使用docker執行一個redis
$docker run --name myRedis -d -p 6379:6379 redis
接著專案加入nuget package
dotnet add package StackExchange.Redis
ConnectionMultiplexer
接著透過 ConnectionMultiplexer.ConnectAsync 取得與redis溝通的物件,依照官方的建議,ConnectionMultiplexer 的所有操作都是Tread-Safe,可以保存並重用它,依照 ConnectAsync 的多載,有幾種取得方式:
單一主機的連線字串
ConnectionMultiplexer redis = await ConnectionMultiplexer.ConnectAsync("localhost,serviceName=Master");
多台主機的連線字串
若連結的redis是主從架構,則可以傳入兩個redis的連線位置,並且用","分隔
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server1:6379,server2:6379");
ConfigurationOptions
可以建立 ConfigurationOptions,設定相關參數,傳入 ConnectAsync 來取得 ConnectionMultiplexer
var options = new ConfigurationOptions()
{
EndPoints =
{
"localhost"
}
};
var redis = await ConnectionMultiplexer.ConnectAsync(options);
}
從 String Parse為 ConfigurationOptions 加工處理
若連線設定是以string的保存的,例如從appsetting取得連線字串,取得後可以透過 ConfigurationOptions.Parse() 把 string 轉回 ConfigurationOptions,做細部的設定後再取得連線物件
string configString = GetRedisConfiguration();
var options = ConfigurationOptions.Parse(configString);
options.AllowAdmin = true;
var conn = ConnectionMultiplexer.Connect(options);
IDatabase
接著使用 GetDatabase() 來訪問對應的redis(若存在多個redis,可以透過第一個參數指定要訪問的redis)
IDatabase database = redis.GetDatabase();
透過 IDatabase 實例,就可以簡單地對 redis 進行操作
var key = "test-key";
var value = "test"
var result = await database.StringSetAsync(key, value);
var value = await database.StringGetAsync(key);
連線重試
若Redis連線失敗時,StackExchange.Redis 會自動嘗試重新連接,每兩次重試的間隔時間、策略可由 ConfigurationOptions 的 ReconnectRetryPolicy 參數決定,有線性及指數兩種方式可以設定,預設為 LinearRetry(5000)
var options = new ConfigurationOptions()
{
EndPoints = { "localhost" },
ReconnectRetryPolicy = new ExponentialRetry(5000),
};
var redis = await ConnectionMultiplexer.ConnectAsync(options);
使用Redis實現簡單的搶票機制
瞭解了基本使用方式,接著做個小功能試試看
情境
活動限制 100 個名額,有多個使用者會經過不同的server搶票,不允許超賣的情形
流程
活動開始前,在Redis塞了一個這次活動的數量(100),
搶票過程中,預期在每經過一個client後就遞減數量並且返回成功,當數量到達0時,後續的client都會拿到失敗
實作
Init
在開始之前先初始化Redis的資料,塞入活動用的允許數量
private static readonly RedisClient _client = new RedisClient();
private static async Task Init()
{
await _client.SetString(_eventCountKey, 100);
await _client.GetString(_eventCountKey);
}
Client Request
模擬有105個client request,每個client一個task讓同時多個人在搶票
private static readonly TicketService _service = new TicketService(_client);
private static readonly string _eventCountKey = "Event_Count";
private static async Task Run()
{
var result = new ConcurrentStack<bool>();
var tasks = new List<Task>();
for (var index = 0; index < 105; index++)
{
var number = index;
tasks.Add(Task.Run(async () =>
{
result.Push(await _service.GetTicket(_eventCountKey));
Console.WriteLine($"{number}");
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"success count: {result.Count(r => r == true)}");
}
TicketService
public class TicketService
{
private readonly RedisClient _client;
public TicketService(RedisClient redisClient)
{
_client = redisClient;
}
public async Task<bool> GetTicket(string key)
{
if (await TicketCount(key) > 0 && await _client.Lock(key, TimeSpan.FromMilliseconds(100)))
{
try
{
var lastCount = await _client.StringDecrement(key);
return lastCount >= 0;
}
finally
{
await _client.LockRelease(key);
}
}
return false;
}
private async Task<int> TicketCount(string key)
{
return (int)await _client.GetString(key);
}
}
RedisClient
RedisClient 負責跟Redis的溝通,包含Get、Set等的所有操作
public class RedisClient
{
private readonly ConnectionMultiplexer _connection;
public RedisClient()
{
var options = new ConfigurationOptions()
{
EndPoints = {"localhost"}
};
_connection = ConnectionMultiplexer.Connect(options);
}
public async Task<bool> Lock(string key, TimeSpan expiry)
{
var lockKey = $"Lock_{key}";
var number = 0;
do
{
try
{
var database = _connection.GetDatabase();
if (await database.LockTakeAsync(lockKey, Environment.MachineName, expiry))
{
return true;
}
}
catch (Exception)
{
await Task.Delay(200);
number++;
}
} while (number < 10);
return false;
}
public async Task SetString(string key, RedisValue value)
{
var database = _connection.GetDatabase();
await database.StringSetAsync(key, value);
}
public async Task<RedisValue> GetString(string key)
{
var database = _connection.GetDatabase();
return await database.StringGetAsync(key);
}
public async Task<bool> LockRelease(string key)
{
var lockKey = $"Lock_{key}";
var database = _connection.GetDatabase();
return await database.LockReleaseAsync(lockKey, Environment.MachineName);
}
public async Task<long> StringDecrement(string key)
{
var database = _connection.GetDatabase();
return await database.StringDecrementAsync(key);
}
}
執行
上面實作完成後,接著執行看看,結果看起來跟預期一樣
但是這個做法的缺點會在每個人都需要等待期間搶票的流程結束,才能放下一個人進來,若中間的邏輯複雜、執行時間長,就容易造成Client不斷失敗或是需要不停的等待。
接著調整一下作法,調整為使用排隊的方式,每個搶票成功的Client僅僅是排入隊伍而已,再由其他的背景服務處理各個Client的購票邏輯,減緩上面的做法會造成的問題。
使用 Redis 的 Publish/Subscribe 優化搶票機制
比照前面的作法,增加了但對於搶票成功的Client,僅將他 Publish 到專屬的Chnnel,而另起一個背景服務逐步消化Chnnel的訊息
背景服務
private static async Task Subscribe()
{
await _service.Subscribe();
}
TicketService
public async Task<bool> GetTicket(int client)
{
...
try
{
var lastCount = await _client.StringDecrement(_key);
if (lastCount < 0)
{
return false;
}
await _client.Publish(_channelKey, client.ToString());
return true;
}
...
}
public async Task Subscribe()
{
await _client.Subscribe(_channelKey, value =>
{
_users.Add(value);
SendEmail("恭喜搶到!");
Console.WriteLine($"user count: {_users.Count}");
});
}
執行
StackExchange.Redis Docs
SampleCode
Redis聽過了很久,也自以為瞭解了,直到面試時才被狠狠地洗了臉。透過實作簡單的功能,加強對Redis的瞭解。
基本使用
為了方便,直接使用docker執行一個redis
$docker run --name myRedis -d -p 6379:6379 redis
接著專案加入nuget package
ConnectionMultiplexer
接著透過 ConnectionMultiplexer.ConnectAsync 取得與redis溝通的物件,依照官方的建議,ConnectionMultiplexer 的所有操作都是Tread-Safe,可以保存並重用它,依照 ConnectAsync 的多載,有幾種取得方式:
單一主機的連線字串
ConnectionMultiplexer redis = await ConnectionMultiplexer.ConnectAsync("localhost,serviceName=Master");
多台主機的連線字串
若連結的redis是主從架構,則可以傳入兩個redis的連線位置,並且用","分隔
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server1:6379,server2:6379");
ConfigurationOptions
可以建立 ConfigurationOptions,設定相關參數,傳入 ConnectAsync 來取得 ConnectionMultiplexer
var options = new ConfigurationOptions() { EndPoints = { "localhost" } }; var redis = await ConnectionMultiplexer.ConnectAsync(options); }
從 String Parse為 ConfigurationOptions 加工處理
若連線設定是以string的保存的,例如從appsetting取得連線字串,取得後可以透過 ConfigurationOptions.Parse() 把 string 轉回 ConfigurationOptions,做細部的設定後再取得連線物件
string configString = GetRedisConfiguration(); var options = ConfigurationOptions.Parse(configString); options.AllowAdmin = true; var conn = ConnectionMultiplexer.Connect(options);
IDatabase
接著使用 GetDatabase() 來訪問對應的redis(若存在多個redis,可以透過第一個參數指定要訪問的redis)
IDatabase database = redis.GetDatabase();
透過 IDatabase 實例,就可以簡單地對 redis 進行操作
var key = "test-key"; var value = "test" // set key-value var result = await database.StringSetAsync(key, value); // get value var value = await database.StringGetAsync(key);
連線重試
若Redis連線失敗時,StackExchange.Redis 會自動嘗試重新連接,每兩次重試的間隔時間、策略可由 ConfigurationOptions 的 ReconnectRetryPolicy 參數決定,有線性及指數兩種方式可以設定,預設為 LinearRetry(5000)
var options = new ConfigurationOptions() { EndPoints = { "localhost" }, ReconnectRetryPolicy = new ExponentialRetry(5000), }; var redis = await ConnectionMultiplexer.ConnectAsync(options);
使用Redis實現簡單的搶票機制
瞭解了基本使用方式,接著做個小功能試試看
情境
活動限制 100 個名額,有多個使用者會經過不同的server搶票,不允許超賣的情形
流程
活動開始前,在Redis塞了一個這次活動的數量(100),
搶票過程中,預期在每經過一個client後就遞減數量並且返回成功,當數量到達0時,後續的client都會拿到失敗
實作
Init
在開始之前先初始化Redis的資料,塞入活動用的允許數量
private static readonly RedisClient _client = new RedisClient(); private static async Task Init() { await _client.SetString(_eventCountKey, 100); await _client.GetString(_eventCountKey); }
Client Request
模擬有105個client request,每個client一個task讓同時多個人在搶票
private static readonly TicketService _service = new TicketService(_client); private static readonly string _eventCountKey = "Event_Count"; private static async Task Run() { var result = new ConcurrentStack<bool>(); var tasks = new List<Task>(); // 發出105個task for (var index = 0; index < 105; index++) { var number = index; tasks.Add(Task.Run(async () => { // 透過 TicketService 處理搶票的邏輯,返回bool result.Push(await _service.GetTicket(_eventCountKey)); Console.WriteLine($"{number}"); })); } await Task.WhenAll(tasks); // 驗證拿到成功的client request數量 Console.WriteLine($"success count: {result.Count(r => r == true)}"); }
TicketService
public class TicketService { private readonly RedisClient _client; public TicketService(RedisClient redisClient) { _client = redisClient; } public async Task<bool> GetTicket(string key) { // 只有在數量還有剩 且 透過Redis的Lock成功,才繼續搶票的動作 // 這邊Lock的Timeout時間為100毫秒,純粹只是為了測試 if (await TicketCount(key) > 0 && await _client.Lock(key, TimeSpan.FromMilliseconds(100))) { try { // 遞減數量,會返回剩餘的數量,剩餘數量小於0代表超賣了,會返回失敗 var lastCount = await _client.StringDecrement(key); return lastCount >= 0; } finally { // 完成後要把Lock釋放 await _client.LockRelease(key); } } return false; } private async Task<int> TicketCount(string key) { return (int)await _client.GetString(key); } }
RedisClient
RedisClient 負責跟Redis的溝通,包含Get、Set等的所有操作
public class RedisClient { // 可重用,所以在ctor建立後就放到filed上 private readonly ConnectionMultiplexer _connection; public RedisClient() { var options = new ConfigurationOptions() { EndPoints = {"localhost"} }; _connection = ConnectionMultiplexer.Connect(options); } public async Task<bool> Lock(string key, TimeSpan expiry) { // Lock失敗就等200毫秒,再重試,最多10次 var lockKey = $"Lock_{key}"; var number = 0; do { try { var database = _connection.GetDatabase(); if (await database.LockTakeAsync(lockKey, Environment.MachineName, expiry)) { return true; } } catch (Exception) { await Task.Delay(200); number++; } } while (number < 10); return false; } public async Task SetString(string key, RedisValue value) { var database = _connection.GetDatabase(); await database.StringSetAsync(key, value); } public async Task<RedisValue> GetString(string key) { var database = _connection.GetDatabase(); return await database.StringGetAsync(key); } public async Task<bool> LockRelease(string key) { var lockKey = $"Lock_{key}"; var database = _connection.GetDatabase(); return await database.LockReleaseAsync(lockKey, Environment.MachineName); } public async Task<long> StringDecrement(string key) { var database = _connection.GetDatabase(); return await database.StringDecrementAsync(key); } }
執行
上面實作完成後,接著執行看看,結果看起來跟預期一樣
但是這個做法的缺點會在每個人都需要等待期間搶票的流程結束,才能放下一個人進來,若中間的邏輯複雜、執行時間長,就容易造成Client不斷失敗或是需要不停的等待。
接著調整一下作法,調整為使用排隊的方式,每個搶票成功的Client僅僅是排入隊伍而已,再由其他的背景服務處理各個Client的購票邏輯,減緩上面的做法會造成的問題。
使用 Redis 的 Publish/Subscribe 優化搶票機制
比照前面的作法,增加了但對於搶票成功的Client,僅將他 Publish 到專屬的Chnnel,而另起一個背景服務逐步消化Chnnel的訊息
背景服務
private static async Task Subscribe() { await _service.Subscribe(); }
TicketService
public async Task<bool> GetTicket(int client) { ... try { var lastCount = await _client.StringDecrement(_key); if (lastCount < 0) { return false; } // 前後都相同,僅差在這邊多了一個 Publish 的動作,而上面僅檢查剩餘數量 await _client.Publish(_channelKey, client.ToString()); return true; } ... } // Subscribe 會逐筆處理每一個Chnnel的訊息,在這邊就可以放心處理商業邏輯,而不會讓Client因為執行過久造成的等待 public async Task Subscribe() { await _client.Subscribe(_channelKey, value => { _users.Add(value); SendEmail("恭喜搶到!"); Console.WriteLine($"user count: {_users.Count}"); }); }
執行
StackExchange.Redis Docs
SampleCode
Redis聽過了很久,也自以為瞭解了,直到面試時才被狠狠地洗了臉。透過實作簡單的功能,加強對Redis的瞭解。