[C#]Simple Publisher and Subscriber

  • 868
  • 0
  • C#
  • 2018-09-17

.net4.0引入了IObservable(publisher)和IObserver(Subscriber)兩個介面,

並提供實作publish subscribe pattern標準範本,這樣的模式非常適合分散式推入型通知,

而且每個元件職責相當明確,這篇紀錄如何使用這兩個介面。

Real-World中,我們可能有需要監控資料或狀態變更時,後需要處理的商業邏輯需求,

.net本身所提供的event和delegate也是在處理相同事情,

列如textchanged event,監控textbox中文字改變後,

開發人員後續要處理的邏輯可能會包在如Ontextchanged method中,

這篇文章我參考MSDN sample code,透過IOservable和IOserver這兩個介面,

簡單實現訊息推送訂閱服務。

Publisher職責

提供訂閱者Topic的訂閱和取消

推送訊息到Topic並通知所有訂閱者(可以推送到不同的Topic)

public class Publisher<T> : IObservable<T>
    {
        private List<IObserver<T>> _observers;
        private readonly string _name;
        public Publisher(string name)
        {
            _observers = new List<IObserver<T>>();
            this._name = name;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (!_observers.Contains(observer))
                _observers.Add(observer);
            return new UnSubscribe(this._name, _observers, observer);
        }

        protected void Notification(T obj)
        {
            _observers.ForEach(o => o.OnNext(obj));
        }

        private class UnSubscribe : IDisposable
        {
            private readonly List<IObserver<T>> _observers;
            private readonly IObserver<T> _observer;
            private readonly string _name;

            public UnSubscribe(string name, List<IObserver<T>> observers, IObserver<T> observer)
            {
                this._observers = observers;
                this._observer = observer;
                this._name = name;
                Console.WriteLine($"{_name} Subscribed");
            }

            public void Dispose()
            {
                if (_observer != null && _observers.Contains(_observer))
                {
                    _observers.Remove(_observer);
                    Console.WriteLine($"{_name} UnSubscribed");
                }
            }
        }
    }

 

Subscriber職責

接收不同Topic的訊息後(可以by partition也可以broadcast),

進行相對應商業邏輯處理(我這裡就簡單顯示Message)

public class Subscriber : IObserver<ManagerPublisher>
    {
        private readonly string _name;
        private readonly int _partition;

        public Subscriber(string name, int parition)
        {
            this._name = name;
            this._partition = parition;
        }

        void IObserver<ManagerPublisher>.OnCompleted()
        {
            Console.WriteLine("Push Completed");
        }

        void IObserver<ManagerPublisher>.OnError(Exception error)
        {
            Console.WriteLine($"Error:{error.Message}");
        }

        void IObserver<ManagerPublisher>.OnNext(ManagerPublisher value)
        {
            if (ReferenceEquals(value, null))
                return;
            try
            {
                var message = value.NewMessage;
                if (this._partition <= 0 || message.Partition == this._partition)
                    Console.WriteLine(
                        $"Publisher:{value._name}, Subscriber:{this._name}, Topic:{message.Topic}, Partition:{message.Partition}, Value:{message.Value}, Timestamp:{message.Timestamp}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"Error onNext:${e.Message}");
                throw;
            }
        }
    }

 

Message Model

public class Message
    {
        public string Topic { get; set; }
        public string Value { get; set; }
        public int Partition { get; set; }
        public DateTime Timestamp { get; set; }
    }

 

Client

 ManagerPublisher mp1 = new ManagerPublisher("Logger");
            ManagerPublisher mp2 = new ManagerPublisher("Bonus");
            Subscriber s1 = new Subscriber("Sherry", 0);
            Subscriber s2 = new Subscriber("FiFi", 1);
            Subscriber s3 = new Subscriber("RicoIsme", 1);
            Subscriber s4 = new Subscriber("Sherry", 2);

            var removeMe1 = mp1.Subscribe(s1);
            var removeMe2 = mp1.Subscribe(s2);
            var removeMe3 = mp2.Subscribe(s3);
            var removeMe4 = mp2.Subscribe(s4);

            mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 0, Value = "Log1", Timestamp = DateTime.UtcNow };
            mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 1, Value = "Log2", Timestamp = DateTime.UtcNow };
            mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 2, Value = "Log3", Timestamp = DateTime.UtcNow };
            mp2.NewMessage = new Message { Topic = "ricoBonus", Partition = 1, Value = "Bonus1", Timestamp = DateTime.UtcNow };
            mp2.NewMessage = new Message { Topic = "ricoBonus", Partition = 2, Value = "Bonus2", Timestamp = DateTime.UtcNow };

            // Cleanup 
            removeMe1.Dispose();
            removeMe2.Dispose();
            removeMe3.Dispose();
            removeMe4.Dispose();

            Console.ReadLine();

 

結果

Sherry訂閱了ricoLog(broadcast)和ricoBonus(partition=2)兩個Topic

FiFi只訂閱了ricoLog且partition=1

Ricoisme只訂閱了ricoBonus且partition=1

透過這兩個標準介面,實現publish subscribe pattern,

讓我覺得程式可讀性提高不少,且開發上也更直覺。

 

參考

IObservable<T> 介面

觀察器設計模式

Publisher/Subscriber pattern with Event/Delegate and EventAggregator

Observer Pattern (C#)

THE PUBLISH SUBSCRIBE PATTERN IN C# AND SOME GOTCHAS

NEW INTERFACES FOR IMPLEMENTING PUBLISH-SUBSCRIBE IN .NET