使用 MongoDB for C# .NET Driver 監聽 Collection 更新/異動

最近因為工作需要,需要偷偷即時讀取別人 MonogDB 資料到 MSSQL

所以主管就問我 MongoDB 有沒有 Trigger 之類的可以用

用來監聽 Collection 的異動來做一些想做的事情

於是翻了一下官方文件跟Goolge,發現雖然沒有Trigger,但有類似的機制可以達成想要的效果

主要是透過 Capped Collections 跟 Tailable Cursors 來達成

建立測試的DB

名字先叫 IoTService (這個 IoT 不是你想的 IoT, 只是測試用的別隨便亂想)

這是第三方的MongoDB GUI管理工具,為方便呈現所以暫用,絕對不是我懶惰不想下指令

打開VS隨便新增一個自己習慣專案

...

(略)

安裝C# and .NET MongoDB Driver

參考文件: https://docs.mongodb.com/ecosystem/drivers/csharp/

打開Nuget搜尋MongoDB,這裡我直接選最新穩定版 v2.4.4

Capped collections 程式碼複製貼上

範例會using下列三個

using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Bson.Serialization.Attributes;

因為我是開MVC專案,直接寫在 Home/Index 裡測試,不要介意

protected static IMongoClient _client;
protected static IMongoDatabase _database;

public ActionResult Index()
{
      _client = new MongoClient();

      _database = _client.GetDatabase("IoTService");

      _database.CreateCollection(
           "SystemSet", 
            new CreateCollectionOptions() {
            Capped = true,
            MaxSize = 5242880,
            MaxDocuments = 5000 });

      return View();
}

可以看到上面這段 Code 中,Create Collection 時,把 Capped 設成 true

這是因為後續再用 Cursor 來 Tail 資料的時候,必須要設置成 Capped Collection 才能使用

設定成 Capped Collection 有什麼影響可以參考下面官方文件 Overview 的說明

Capped collections are fixed-size collections that support high-throughput operations that insert and retrieve documents based on insertion order. Capped collections work in a way similar to circular buffers: once a collection fills its allocated space, it makes room for new documents by overwriting the oldest documents in the collection.

因此可以看到後面 CollectionOptions 除了新增了 Capped 以外,也必須要建立 MaxSize 

一旦達到了 MaxSize 的限制,就會把最舊的資料刪除

接著下一段就是 透過 Tailable Cursor 獲取新寫入的 document

Tailable Cursor 程式碼複製貼上

參考官網範例: http://mongodb.github.io/mongo-csharp-driver/2.2/examples/tailable_cursor/

private static void TailCollection(IMongoCollection<BsonDocument> collection)
{
    // Set lastInsertDate to the smallest value possible
    BsonValue lastInsertDate = BsonMinKey.Value;
    
    var options = new FindOptions<BsonDocument> 
    { 
        // Our cursor is a tailable cursor and informs the server to await
        CursorType = CursorType.TailableAwait
    };
    
    // Initially, we don't have a filter. An empty BsonDocument matches everything.
    BsonDocument filter = new BsonDocument();
    
    // NOTE: This loops forever. It would be prudent to provide some form of 
    // an escape condition based on your needs; e.g. the user presses a key.
    while (true)
    {
        // Start the cursor and wait for the initial response
        using (var cursor = collection.FindSync(filter, options))
        {
            foreach (var document in cursor.ToEnumerable())
            {
                // Set the last value we saw 
                lastInsertDate = document["insertDate"];
                
                // Write the document to the console.
                Console.WriteLine(document.ToString());
            }
        }

        // The tailable cursor died so loop through and restart it
        // Now, we want documents that are strictly greater than the last value we saw
        filter = new BsonDocument("$gt", new BsonDocument("insertDate", lastInsertDate));
    }
}

因為這邊 &gt 要抓的最後 InsertData 我實在不知道在哪 ... (才剛碰MongoDB半天)

所以我改成了預設 Key(_Id) ,理論上 _Id 是絕對不會重覆的應該更沒有問題吧 (心虛

        private static void TailCollection(IMongoCollection<BsonDocument> collection)
        {
            // Set lastInsertDate to the smallest value possible
            BsonValue lastInsertId = BsonMinKey.Value;

            var options = new FindOptions<BsonDocument>
            {
                // Our cursor is a tailable cursor and informs the server to await
                CursorType = CursorType.TailableAwait
            };

            // Initially, we don't have a filter. An empty BsonDocument matches everything.
            BsonDocument filter = new BsonDocument();

            // NOTE: This loops forever. It would be prudent to provide some form of 
            // an escape condition based on your needs; e.g. the user presses a key.
            while (true)
            {
                // Start the cursor and wait for the initial response
                using (var cursor = collection.FindSync(filter, options))
                {
                    foreach (var document in cursor.ToEnumerable())
                    {
                        // Set the last value we saw 
                        lastInsertId = document["_id"];

                        // Write the document to the console.
                        Debug.WriteLine(document.ToString());
                    }
                }

                // The tailable cursor died so loop through and restart it
                // Now, we want documents that are strictly greater than the last value we saw
                filter = new BsonDocument("gt", new BsonDocument("_id", lastInsertId));
            }
        }

接著回到 Home/Index 裡面加入要 Tail 的 Collection 如下

public ActionResult Index()
{
      _client = new MongoClient();

      _database = _client.GetDatabase("IoTService");

      _database.CreateCollection(
           "SystemSet", 
            new CreateCollectionOptions() {
            Capped = true,
            MaxSize = 5242880,
            MaxDocuments = 5000 });

      IMongoCollection<BsonDocument> TailCollectionTatget = _database.GetCollection<BsonDocument>("SystemSet");

      TailCollection(TailCollectionTatget);

      return View();
}

最後按F5執行

把中斷點下在TailCollection function 的 foreach (var document in cursor.ToEnumerable()) 裡面

可以看到只要有寫入資料的時候,就會進入到中斷點裡

若是一個已經有資料的 Capped Collections ,在範例 Code 中因為沒有先 filter 資料,所以會直接跑進 foreach (var document in cursor.ToEnumerable())

延伸問題

1.若是已存在 Collection 要轉成 Capped Collection,相關指令是 convertToCapped,但官方有強調會產生 lock

This command obtains a global write lock and will block other operations until it has completed.

2.PHP, Ruby, Python, and Perl examples of using tailable cursors.

 

參考資源

The MongoDB 3.4 Manual Doc https://docs.mongodb.com/manual/

How to listen for changes to a MongoDB collection? https://stackoverflow.com/questions/9691316/how-to-listen-for-changes-to-a-mongodb-collection