using System; using System.Collections.Generic; using UniRx.InternalUtil; namespace UniRx { public interface IMessagePublisher { /// /// Send Message to all receiver. /// void Publish(T message); } public interface IMessageReceiver { /// /// Subscribe typed message. /// IObservable Receive(); } public interface IMessageBroker : IMessagePublisher, IMessageReceiver { } public interface IAsyncMessagePublisher { /// /// Send Message to all receiver and await complete. /// IObservable PublishAsync(T message); } public interface IAsyncMessageReceiver { /// /// Subscribe typed message. /// IDisposable Subscribe(Func> asyncMessageReceiver); } public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver { } /// /// In-Memory PubSub filtered by Type. /// public class MessageBroker : IMessageBroker, IDisposable { /// /// MessageBroker in Global scope. /// public static readonly IMessageBroker Default = new MessageBroker(); bool isDisposed = false; readonly Dictionary notifiers = new Dictionary(); public void Publish(T message) { object notifier; lock (notifiers) { if (isDisposed) return; if (!notifiers.TryGetValue(typeof(T), out notifier)) { return; } } ((ISubject)notifier).OnNext(message); } public IObservable Receive() { object notifier; lock (notifiers) { if (isDisposed) throw new ObjectDisposedException("MessageBroker"); if (!notifiers.TryGetValue(typeof(T), out notifier)) { ISubject n = new Subject().Synchronize(); notifier = n; notifiers.Add(typeof(T), notifier); } } return ((IObservable)notifier).AsObservable(); } public void Dispose() { lock (notifiers) { if (!isDisposed) { isDisposed = true; notifiers.Clear(); } } } } /// /// In-Memory PubSub filtered by Type. /// public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable { /// /// AsyncMessageBroker in Global scope. /// public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker(); bool isDisposed = false; readonly Dictionary notifiers = new Dictionary(); public IObservable PublishAsync(T message) { UniRx.InternalUtil.ImmutableList>> notifier; lock (notifiers) { if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; if (notifiers.TryGetValue(typeof(T), out _notifier)) { notifier = (UniRx.InternalUtil.ImmutableList>>)_notifier; } else { return Observable.ReturnUnit(); } } var data = notifier.Data; var awaiter = new IObservable[data.Length]; for (int i = 0; i < data.Length; i++) { awaiter[i] = data[i].Invoke(message); } return Observable.WhenAll(awaiter); } public IDisposable Subscribe(Func> asyncMessageReceiver) { lock (notifiers) { if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; if (!notifiers.TryGetValue(typeof(T), out _notifier)) { var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; notifier = notifier.Add(asyncMessageReceiver); notifiers.Add(typeof(T), notifier); } else { var notifier = (ImmutableList>>)_notifier; notifier = notifier.Add(asyncMessageReceiver); notifiers[typeof(T)] = notifier; } } return new Subscription(this, asyncMessageReceiver); } public void Dispose() { lock (notifiers) { if (!isDisposed) { isDisposed = true; notifiers.Clear(); } } } class Subscription : IDisposable { readonly AsyncMessageBroker parent; readonly Func> asyncMessageReceiver; public Subscription(AsyncMessageBroker parent, Func> asyncMessageReceiver) { this.parent = parent; this.asyncMessageReceiver = asyncMessageReceiver; } public void Dispose() { lock (parent.notifiers) { object _notifier; if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) { var notifier = (ImmutableList>>)_notifier; notifier = notifier.Remove(asyncMessageReceiver); parent.notifiers[typeof(T)] = notifier; } } } } } }