C# 8.0 搶先看 -- Async Stream (3) 逆練九陰真經

標題誇張了點,不過我在寫 Async Stream 範例的過程中突發奇想把它移植到一般的 .NET Framework 環境中使用。

本篇文章使用環境
開發環境 Visual Studio 2019 Preview 1 (16.0.0 Preview 1)
框架       .NET Framework 4.7.2
編譯器    C# 8.0 beta

有沒有可能把這個行為移植到傳統的 .NET Framework 環境中?還真的有可能,只不過需要一點點小小的改造,首先因為用到了 ValueTask,所以先使用 nuget 加入 System.Threading.Tasks.Extensions 套件,我在測試所使用的版本是 v4.6.0-preview.18751.3,nuget 同時會幫你安裝 System.Runtime.CompilerServices.Unsafe 套件。

安裝完成後,要自己寫程式碼先補上 Async Streams 的三個主要 interfaces:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }
    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }   
}

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

為了 yield return ,照著前一篇文自行寫程式碼加上編譯器需要的兩個型別:

namespace System.Threading.Tasks
{
    using System.Runtime.CompilerServices;
    using System.Threading.Tasks.Sources;

    internal struct ManualResetValueTaskSourceLogic<TResult>
    {
        private ManualResetValueTaskSourceCore<TResult> _core;
        public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent) : this() { }
        public short Version => _core.Version;
        public TResult GetResult(short token) => _core.GetResult(token);
        public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
        public void Reset() => _core.Reset();
        public void SetResult(TResult result) => _core.SetResult(result);
        public void SetException(Exception error) => _core.SetException(error);
    }
}

namespace System.Runtime.CompilerServices
{
    internal interface IStrongBox<T> { ref T Value { get; } }
}

步驟還沒完呢,這時候會發現還缺少一個型別 ManualResetValueTaskSourceCore<TResult>,這個型別在 ManualResetValueTaskSourceLogic<TResult> 擔任一個很重要的角色 -- 它才是真正執行邏輯的型。很不幸的,這個型別並不存在於 .NET Framework 4.7.2,於是我去微軟的 git hub 找到了這個型別在 .NET Core 裡的原始程式碼;問題來了, 這個程式碼的內用了一些 .NET Core 才有的東西,此時我面臨了兩個抉擇 (1) 一層層追原始碼全部實作 (2) 試著改看看,讓原來呼叫 .NET Core 獨有的部分轉換成 .NET Framework 可以使用的形式。基於我只是覺得好玩,想要早點做完這個實驗,所以選了 (2),修改了一部分的程式碼來適應 .NET Framework 4.7.2,在此做一個免責宣告 -- 我不知道這樣搞會有甚麼奇妙的副作用,總之請各位以有趣的角度來看待這件事。

我改好的 ManualResetValueTaskSourceCore<TResult> 如下 (這裡還包含有另一個 ManualResetValueTaskSourceCoreShared class ):

namespace System.Threading.Tasks.Sources
{

    /// <summary>
    /// reference from https://github.com/dotnet/coreclr/blob/master/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    [StructLayout(LayoutKind.Auto)]
    public struct ManualResetValueTaskSourceCore<TResult>
    {
        
        private Action<object> _continuation;      
        private object _continuationState;      
        private ExecutionContext _executionContext;       
        private object _capturedContext;        
        private bool _completed;      
        private TResult _result;        
        private ExceptionDispatchInfo _error;      
        private short _version;

        
        public bool RunContinuationsAsynchronously { get; set; }
       
        public void Reset()
        {           
            _version++;
            _completed = false;
            _result = default;
            _error = null;
            _executionContext = null;
            _capturedContext = null;
            _continuation = null;
            _continuationState = null;
        }

        
        public void SetResult(TResult result)
        {
            _result = result;
            SignalCompletion();
        }

        
        public void SetException(Exception error)
        {
            _error = ExceptionDispatchInfo.Capture(error);
            SignalCompletion();
        }

        
        public short Version => _version;

       
        public ValueTaskSourceStatus GetStatus(short token)
        {
            ValidateToken(token);
            return
                !_completed ? ValueTaskSourceStatus.Pending :
                _error == null ? ValueTaskSourceStatus.Succeeded :
                _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
                ValueTaskSourceStatus.Faulted;
        }

        public TResult GetResult(short token)
        {
            ValidateToken(token);
            if (!_completed)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }

            _error?.Throw();
            return _result;
        }

       
        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
        {
            if (continuation == null)
            {
                throw new ArgumentNullException(nameof(continuation));
            }
            ValidateToken(token);

            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                _executionContext = ExecutionContext.Capture();
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext sc = SynchronizationContext.Current;
                if (sc != null && sc.GetType() != typeof(SynchronizationContext))
                {
                    _capturedContext = sc;
                }
                else
                {
                    TaskScheduler ts = TaskScheduler.Current;
                    if (ts != TaskScheduler.Default)
                    {
                        _capturedContext = ts;
                    }
                }
            }

           

            object oldContinuation = _continuation;
            if (oldContinuation == null)
            {
                _continuationState = state;
                oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
            }

            if (oldContinuation != null)
            {               
                if (!ReferenceEquals(oldContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
                {
                    ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
                }

                switch (_capturedContext)
                {
                    case null:
                        if (_executionContext != null)
                        {
                            ThreadPool.QueueUserWorkItem(new WaitCallback(continuation), state);
                        }
                        else
                        {
                            ThreadPool.UnsafeQueueUserWorkItem(new WaitCallback(continuation), state);
                        }
                        break;

                    case SynchronizationContext sc:
                        sc.Post(s =>
                        {
                            var tuple = (Tuple<Action<object>, object>)s;
                            tuple.Item1(tuple.Item2);
                        }, Tuple.Create(continuation, state));
                        break;

                    case TaskScheduler ts:
                        Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                        break;
                }
            }
        }

       
        private void ValidateToken(short token)
        {
            if (token != _version)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }
        }
        
        private void SignalCompletion()
        {
            if (_completed)
            {
                ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
            }
            _completed = true;

            if (_continuation != null || Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel, null) != null)
            {
                if (_executionContext != null)
                {
                    ExecutionContext.Run(
                        _executionContext,
                        (s) => ((ManualResetValueTaskSourceCore<TResult>)s).InvokeContinuation(),
                       this);
                }
                else
                {
                    InvokeContinuation();
                }
            }
        }

       
        private void InvokeContinuation()
        {
            switch (_capturedContext)
            {
                case null:
                    if (RunContinuationsAsynchronously)
                    {
                        if (_executionContext != null)
                        {
                            ThreadPool.QueueUserWorkItem(new WaitCallback(_continuation), _continuationState);
                        }
                        else
                        {
                            ThreadPool.UnsafeQueueUserWorkItem(new WaitCallback(_continuation), _continuationState);
                        }
                    }
                    else
                    {
                        _continuation(_continuationState);
                    }
                    break;

                case SynchronizationContext sc:
                    sc.Post(s =>
                    {
                        var state = (Tuple<Action<object>, object>)s;
                        state.Item1(state.Item2);
                    }, Tuple.Create(_continuation, _continuationState));
                    break;

                case TaskScheduler ts:
                    Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                    break;
            }
        }
    }

    internal static class ManualResetValueTaskSourceCoreShared
    {
        internal static void ThrowInvalidOperationException() => throw new InvalidOperationException();

        internal static readonly Action<object> s_sentinel = CompletionSentinel;
        private static void CompletionSentinel(object _) 
        {
            Debug.Fail("The sentinel delegate should never be invoked.");
            ThrowInvalidOperationException();
        }
    }
}

然後複製前一篇文章的例子:
 

 public class AsyncEnumerableProcess
    {
        async static public IAsyncEnumerable<string> ReadLineAsync(string path)
        {
            var enumerator = new AsyncEnumerator(path);
            try
            {
                while (await enumerator.MoveNextAsync())
                {
                    await Task.Delay(100);
                    yield return enumerator.Current;
                }
            }
            finally
            {
               await enumerator.DisposeAsync();
            }            
        }
    }

    public class AsyncEnumerator : IAsyncEnumerator<string>
    {
        private readonly StreamReader _reader;

        private bool _disposed;

        public string Current { get; private set; }

        public AsyncEnumerator(string path)
        {
            _reader = File.OpenText(path);
            _disposed = false;
        }
        async public ValueTask<bool> MoveNextAsync()
        {
             Current = await _reader.ReadLineAsync();          
            return Current != null;
        }
        async public ValueTask DisposeAsync()
        {
            await Task.Run(() => Dispose());            
        }

        private void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void Dispose(bool disposing)
        {
            if (!this._disposed)
            {
                if (_reader != null)
                {
                    _reader.Dispose();
                }
                _disposed = true;
            }
        }
    }
    class Program
    {
        async static Task Main(string[] args)
        {
            var path = "SourceFile.txt";
            await foreach (var item in AsyncEnumerableProcess.ReadLineAsync(path))
            {
                Console.WriteLine(item);
            };
            Console.ReadLine();
        }
    }

嗯,真的會動,沒想到這樣就可以在 .NET Framework 4.7.2 用 Async Stream 了。