Rx.Net 練習

  • 170
  • 0

練習

Rx可以拿來處理串流特性的資料,

以這樣的型式來撰寫程式感覺起來蠻簡潔,可讀性不錯,

想要以這樣的樣式來寫一個處理的模型,

這個模型要達成的目標是,

  • 當資料進來後可以立即處理進來的資料
  • 如果資料處理中的時候,先把後來進來的資料暫存起來
  • 當處理程序完成後,可以接下去處理待處理的資料,並且是一次取得所有待處理的資料

測試用的序列就利用 Observable.Interval() 來產生

資料處理程式大概會像這樣

var source = Observable.Interval(TimeSpan.FromMilliseconds(100));

source.Subscribe(n => { });

這樣的程序是每個資料都進行處理,

不會符合想要達到的目標,

查了一下Rx的用法,發現window似乎符合我想要的處理方式,

window有很多多載,其中有一個多載需要提供windowClosingSelector,

研究了一下,這個是一個信號發射器,用來提供給Rx來切分串流用,

加上window後程式如下

var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();

source
    .Window(() => closer)
    .Subscribe(window => {});

接下來,只要closer給予信號,Subscribe的callback就會執行,

只要對已經切分的window進行處理,應該可以達到一次處理所有資料的需求,

切分的信號需要達成前一個工作已經完成才發送的需求,

如果用一個TaskCompletionSource,應該可以達到,

var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();
var prevTask = Task.CompletedTask;

source
    .Window(() => closer)
    .Select(window =>
    {
        var tcs = new TaskCompletionSource<object>();
        var currentTask = tcs.Task;

        return (window, tcs);
    })
    .Subscribe(tp => {
        tp.window
            .ToArray()
            .Subscribe(async n => {
                ...
                tp.tcs.SetResult(true);
            });
    });

以這樣的程式試著跑一下,發現不會動,

找了一下原因,如果我希望拿到全部的資料,那意謂著這個window已經complete,

而window的complete,代表的是closer有發送信號來切分串流,

所以需要寫一個發送信號的程序,這個發送信號需要兩個定義

  • 當串流中有一個以上的資料
  • 上一個工作已經完成
var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();
var prevTask = Task.CompletedTask;

source
    .Window(() => closer)
    .Select(window =>
    {
        var tcs = new TaskCompletionSource<object>();
        var currentTask = tcs.Task;

        window.FirstAsync()
            .Subscribe(async _ =>
            {
                await prevTask.ConfigureAwait(false);

                prevTask = currentTask;
                closer.OnNext(Unit.Default);
            });
        return (window, tcs);
    })
    .Subscribe(tp => {
        tp.window
            .ToArray()
            .Subscribe(async n => {
                ...
                tp.tcs.SetResult(true);
            });
    });

 整個功能看起來都已經完成,接下來把這些程式封裝起來

public static IDisposable Processing<T>(this IObservable<T> source, Func<T[], Task> callback)
{
    var closer = new Subject<Unit>();
    var prevTask = Task.CompletedTask;

    return source
        .Window(() => closer)
        .Select(window =>
        {
            var tcs = new TaskCompletionSource<object>();
            var currentTask = tcs.Task;

            window.FirstAsync()
                .Subscribe(async _ =>
                {
                    await prevTask.ConfigureAwait(false);

                    prevTask = currentTask;
                    closer.OnNext(Unit.Default);
                });
            return (window, tcs);
        })
        .Subscribe(tp =>
        {
            tp.window
                .ToArray()
                .Subscribe(async n =>
                {
                    await callback(n).ConfigureAwait(false);
                    tp.tcs.SetResult(true);
                });
        });
}

這樣使用就會變成

source.Processing(async n =>
{
    ...
});

 

完成的程式碼 https://github.com/phoenix-chen-2016/RxSequenceProcessing