最近因為工作需要,需要偷偷即時讀取別人 MonogDB 資料到 MSSQL
所以主管就問我 MongoDB 有沒有 Trigger 之類的可以用
用來監聽 Collection 的異動來做一些想做的事情
於是翻了一下官方文件跟Goolge,發現雖然沒有Trigger,但有類似的機制可以達成想要的效果
主要是透過 Capped Collections 跟 Tailable Cursors 來達成
建立測試的DB
名字先叫 IoTService (這個 IoT 不是你想的 IoT, 只是測試用的別隨便亂想)
打開VS隨便新增一個自己習慣專案
...
(略)
安裝C# and .NET MongoDB Driver
參考文件: https://docs.mongodb.com/ecosystem/drivers/csharp/
打開Nuget搜尋MongoDB,這裡我直接選最新穩定版 v2.4.4
Capped collections 程式碼複製貼上
範例會using下列三個
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Bson.Serialization.Attributes;
因為我是開MVC專案,直接寫在 Home/Index 裡測試,不要介意
protected static IMongoClient _client;
protected static IMongoDatabase _database;
public ActionResult Index()
{
_client = new MongoClient();
_database = _client.GetDatabase("IoTService");
_database.CreateCollection(
"SystemSet",
new CreateCollectionOptions() {
Capped = true,
MaxSize = 5242880,
MaxDocuments = 5000 });
return View();
}
可以看到上面這段 Code 中,Create Collection 時,把 Capped 設成 true
這是因為後續再用 Cursor 來 Tail 資料的時候,必須要設置成 Capped Collection 才能使用
設定成 Capped Collection 有什麼影響可以參考下面官方文件 Overview 的說明
因此可以看到後面 CollectionOptions 除了新增了 Capped 以外,也必須要建立 MaxSize
一旦達到了 MaxSize 的限制,就會把最舊的資料刪除
接著下一段就是 透過 Tailable Cursor 獲取新寫入的 document
Tailable Cursor 程式碼複製貼上
參考官網範例: http://mongodb.github.io/mongo-csharp-driver/2.2/examples/tailable_cursor/
private static void TailCollection(IMongoCollection<BsonDocument> collection)
{
// Set lastInsertDate to the smallest value possible
BsonValue lastInsertDate = BsonMinKey.Value;
var options = new FindOptions<BsonDocument>
{
// Our cursor is a tailable cursor and informs the server to await
CursorType = CursorType.TailableAwait
};
// Initially, we don't have a filter. An empty BsonDocument matches everything.
BsonDocument filter = new BsonDocument();
// NOTE: This loops forever. It would be prudent to provide some form of
// an escape condition based on your needs; e.g. the user presses a key.
while (true)
{
// Start the cursor and wait for the initial response
using (var cursor = collection.FindSync(filter, options))
{
foreach (var document in cursor.ToEnumerable())
{
// Set the last value we saw
lastInsertDate = document["insertDate"];
// Write the document to the console.
Console.WriteLine(document.ToString());
}
}
// The tailable cursor died so loop through and restart it
// Now, we want documents that are strictly greater than the last value we saw
filter = new BsonDocument("$gt", new BsonDocument("insertDate", lastInsertDate));
}
}
因為這邊 > 要抓的最後 InsertData 我實在不知道在哪 ... (才剛碰MongoDB半天)
所以我改成了預設 Key(_Id) ,理論上 _Id 是絕對不會重覆的應該更沒有問題吧 (心虛
private static void TailCollection(IMongoCollection<BsonDocument> collection)
{
// Set lastInsertDate to the smallest value possible
BsonValue lastInsertId = BsonMinKey.Value;
var options = new FindOptions<BsonDocument>
{
// Our cursor is a tailable cursor and informs the server to await
CursorType = CursorType.TailableAwait
};
// Initially, we don't have a filter. An empty BsonDocument matches everything.
BsonDocument filter = new BsonDocument();
// NOTE: This loops forever. It would be prudent to provide some form of
// an escape condition based on your needs; e.g. the user presses a key.
while (true)
{
// Start the cursor and wait for the initial response
using (var cursor = collection.FindSync(filter, options))
{
foreach (var document in cursor.ToEnumerable())
{
// Set the last value we saw
lastInsertId = document["_id"];
// Write the document to the console.
Debug.WriteLine(document.ToString());
}
}
// The tailable cursor died so loop through and restart it
// Now, we want documents that are strictly greater than the last value we saw
filter = new BsonDocument("gt", new BsonDocument("_id", lastInsertId));
}
}
接著回到 Home/Index 裡面加入要 Tail 的 Collection 如下
public ActionResult Index()
{
_client = new MongoClient();
_database = _client.GetDatabase("IoTService");
_database.CreateCollection(
"SystemSet",
new CreateCollectionOptions() {
Capped = true,
MaxSize = 5242880,
MaxDocuments = 5000 });
IMongoCollection<BsonDocument> TailCollectionTatget = _database.GetCollection<BsonDocument>("SystemSet");
TailCollection(TailCollectionTatget);
return View();
}
最後按F5執行
把中斷點下在TailCollection function 的 foreach (var document in cursor.ToEnumerable()) 裡面
可以看到只要有寫入資料的時候,就會進入到中斷點裡
延伸問題
1.若是已存在 Collection 要轉成 Capped Collection,相關指令是 convertToCapped,但官方有強調會產生 lock
2.PHP, Ruby, Python, and Perl examples of using tailable cursors.
參考資源
The MongoDB 3.4 Manual Doc https://docs.mongodb.com/manual/
How to listen for changes to a MongoDB collection? https://stackoverflow.com/questions/9691316/how-to-listen-for-changes-to-a-mongodb-collection