練習
Rx可以拿來處理串流特性的資料,
以這樣的型式來撰寫程式感覺起來蠻簡潔,可讀性不錯,
想要以這樣的樣式來寫一個處理的模型,
這個模型要達成的目標是,
- 當資料進來後可以立即處理進來的資料
- 如果資料處理中的時候,先把後來進來的資料暫存起來
- 當處理程序完成後,可以接下去處理待處理的資料,並且是一次取得所有待處理的資料
測試用的序列就利用 Observable.Interval() 來產生
資料處理程式大概會像這樣
var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
source.Subscribe(n => { });
這樣的程序是每個資料都進行處理,
不會符合想要達到的目標,
查了一下Rx的用法,發現window似乎符合我想要的處理方式,
window有很多多載,其中有一個多載需要提供windowClosingSelector,
研究了一下,這個是一個信號發射器,用來提供給Rx來切分串流用,
加上window後程式如下
var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();
source
.Window(() => closer)
.Subscribe(window => {});
接下來,只要closer給予信號,Subscribe的callback就會執行,
只要對已經切分的window進行處理,應該可以達到一次處理所有資料的需求,
切分的信號需要達成前一個工作已經完成才發送的需求,
如果用一個TaskCompletionSource,應該可以達到,
var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();
var prevTask = Task.CompletedTask;
source
.Window(() => closer)
.Select(window =>
{
var tcs = new TaskCompletionSource<object>();
var currentTask = tcs.Task;
return (window, tcs);
})
.Subscribe(tp => {
tp.window
.ToArray()
.Subscribe(async n => {
...
tp.tcs.SetResult(true);
});
});
以這樣的程式試著跑一下,發現不會動,
找了一下原因,如果我希望拿到全部的資料,那意謂著這個window已經complete,
而window的complete,代表的是closer有發送信號來切分串流,
所以需要寫一個發送信號的程序,這個發送信號需要兩個定義
- 當串流中有一個以上的資料
- 上一個工作已經完成
var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
var closer = new Subject<Unit>();
var prevTask = Task.CompletedTask;
source
.Window(() => closer)
.Select(window =>
{
var tcs = new TaskCompletionSource<object>();
var currentTask = tcs.Task;
window.FirstAsync()
.Subscribe(async _ =>
{
await prevTask.ConfigureAwait(false);
prevTask = currentTask;
closer.OnNext(Unit.Default);
});
return (window, tcs);
})
.Subscribe(tp => {
tp.window
.ToArray()
.Subscribe(async n => {
...
tp.tcs.SetResult(true);
});
});
整個功能看起來都已經完成,接下來把這些程式封裝起來
public static IDisposable Processing<T>(this IObservable<T> source, Func<T[], Task> callback)
{
var closer = new Subject<Unit>();
var prevTask = Task.CompletedTask;
return source
.Window(() => closer)
.Select(window =>
{
var tcs = new TaskCompletionSource<object>();
var currentTask = tcs.Task;
window.FirstAsync()
.Subscribe(async _ =>
{
await prevTask.ConfigureAwait(false);
prevTask = currentTask;
closer.OnNext(Unit.Default);
});
return (window, tcs);
})
.Subscribe(tp =>
{
tp.window
.ToArray()
.Subscribe(async n =>
{
await callback(n).ConfigureAwait(false);
tp.tcs.SetResult(true);
});
});
}
這樣使用就會變成
source.Processing(async n =>
{
...
});
完成的程式碼 https://github.com/phoenix-chen-2016/RxSequenceProcessing