[Redis]用redis來完成訂閱與發佈的功能
前言
其實現在pub sub有很多方式可以實現,如果沒有什麼特殊並且簡單的需求,其實我們可以使用redis就能很簡單的完成了訂閱發佈的機制,不需要再額外引用很多第三方的module來把專案搞複雜,任何事情如果想得太遠,其實對於軟體開發來說就是一個浪費,而這篇則要來筆記如何用redis來完成此類需求。
開始動手實做吧
在這邊為了模擬所以我是用linqpad來假裝是主控台去pub,而另一個專案則是web api去做sub,這樣來達成不同專案的溝通,我們只要在同一個ip的區段,透過redis就能很輕易的達成這樣的需求,首先看一下主控台的部份,就三行程式碼,我們就完成了pub的部份了
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");
ISubscriber sub = redis.GetSubscriber();
sub.Publish("receive", "001");
接著因為我們sub的部份是web api,可以直接在global去註冊sub的機制,也因為我們的connection還有sub最好都是去重覆使用,所以建立一支類別也就是RedisHelper來方便使用吧。
public class RedisHelper
{
private static ConnectionMultiplexer redis;
public static ConnectionMultiplexer redisConnection {
get
{
if (redis != null) return redis;
redis = ConnectionMultiplexer.Connect("localhost");
return redis;
}
}
}
接著就是在global的Application_Start去sub,這樣子就完成了pub sub的機制
var redis = RedisHelper.redisConnection;
ISubscriber sub = redis.GetSubscriber();
sub.Subscribe("receive", (channel, message) =>
{
Debug.WriteLine(channel, message);
});
看起來好像很簡單,不過redis是有可能漏訊息的,如果漏訊息的話怎麼處理呢?首先我們要區分一下,有些內容漏了訊息就算了,反正後來的資料有更新就算了,但如果從頭到尾的訊息,都完全不能漏的話,那redis的pub sub就有危險性了,好在redis本身就是一個為了快取而生的機制,所以我們可以在pub的時候,順便把pub的資訊存到redis,在sub的時候去把快取的資訊清掉,所以我們修改一下原本pub的程式碼,改成如下
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");
IDatabase db = redis.GetDatabase();
ISubscriber sub = redis.GetSubscriber();
db.SetAdd("receive","001");
sub.Publish("receive", "001");
接著就是sub的程式碼了
var redis = RedisHelper.redisConnection;
ISubscriber sub = redis.GetSubscriber();
IDatabase db = redis.GetDatabase(0);
string subReceive = "receive";
sub.Subscribe(subReceive, (channel, message) =>
{
var result = db.SetScan(subReceive, "*");
result.ToList().ForEach(x =>
{
//一旦處理完,就把此筆給清掉,所以正常來說receive只會有一筆資料,就是最新的一筆,有兩筆以上就是之前沒sub到的
db.SetRemove(subReceive, x);
});
Debug.WriteLine(channel, message);
});
這樣子就能預防到漏掉訊息的狀況了,但這得注意一下這是針對一台主機的狀況,如果我們會有多台機器的話,就沒辦法這樣子做了,就得選擇更好的方案來實做pub sub的機制了,而在pub sub的部份還有一個可以設定收順序是否要同步,預設是同步的,如果我們想改成非同步的話,只要把connection的PreserveAsyncOrder屬性改成false就好了
redis.PreserveAsyncOrder = false;