123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- using System;
- using System.Collections.Generic;
- using UniRx.InternalUtil;
- namespace UniRx
- {
- public interface IMessagePublisher
- {
- /// <summary>
- /// Send Message to all receiver.
- /// </summary>
- void Publish<T>(T message);
- }
- public interface IMessageReceiver
- {
- /// <summary>
- /// Subscribe typed message.
- /// </summary>
- IObservable<T> Receive<T>();
- }
- public interface IMessageBroker : IMessagePublisher, IMessageReceiver
- {
- }
- public interface IAsyncMessagePublisher
- {
- /// <summary>
- /// Send Message to all receiver and await complete.
- /// </summary>
- IObservable<Unit> PublishAsync<T>(T message);
- }
- public interface IAsyncMessageReceiver
- {
- /// <summary>
- /// Subscribe typed message.
- /// </summary>
- IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver);
- }
- public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver
- {
- }
- /// <summary>
- /// In-Memory PubSub filtered by Type.
- /// </summary>
- public class MessageBroker : IMessageBroker, IDisposable
- {
- /// <summary>
- /// MessageBroker in Global scope.
- /// </summary>
- public static readonly IMessageBroker Default = new MessageBroker();
- bool isDisposed = false;
- readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
- public void Publish<T>(T message)
- {
- object notifier;
- lock (notifiers)
- {
- if (isDisposed) return;
- if (!notifiers.TryGetValue(typeof(T), out notifier))
- {
- return;
- }
- }
- ((ISubject<T>)notifier).OnNext(message);
- }
- public IObservable<T> Receive<T>()
- {
- object notifier;
- lock (notifiers)
- {
- if (isDisposed) throw new ObjectDisposedException("MessageBroker");
- if (!notifiers.TryGetValue(typeof(T), out notifier))
- {
- ISubject<T> n = new Subject<T>().Synchronize();
- notifier = n;
- notifiers.Add(typeof(T), notifier);
- }
- }
- return ((IObservable<T>)notifier).AsObservable();
- }
- public void Dispose()
- {
- lock (notifiers)
- {
- if (!isDisposed)
- {
- isDisposed = true;
- notifiers.Clear();
- }
- }
- }
- }
- /// <summary>
- /// In-Memory PubSub filtered by Type.
- /// </summary>
- public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable
- {
- /// <summary>
- /// AsyncMessageBroker in Global scope.
- /// </summary>
- public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker();
- bool isDisposed = false;
- readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
- public IObservable<Unit> PublishAsync<T>(T message)
- {
- UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>> notifier;
- lock (notifiers)
- {
- if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
- object _notifier;
- if (notifiers.TryGetValue(typeof(T), out _notifier))
- {
- notifier = (UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
- }
- else
- {
- return Observable.ReturnUnit();
- }
- }
- var data = notifier.Data;
- var awaiter = new IObservable<Unit>[data.Length];
- for (int i = 0; i < data.Length; i++)
- {
- awaiter[i] = data[i].Invoke(message);
- }
- return Observable.WhenAll(awaiter);
- }
- public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver)
- {
- lock (notifiers)
- {
- if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
- object _notifier;
- if (!notifiers.TryGetValue(typeof(T), out _notifier))
- {
- var notifier = UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>.Empty;
- notifier = notifier.Add(asyncMessageReceiver);
- notifiers.Add(typeof(T), notifier);
- }
- else
- {
- var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
- notifier = notifier.Add(asyncMessageReceiver);
- notifiers[typeof(T)] = notifier;
- }
- }
- return new Subscription<T>(this, asyncMessageReceiver);
- }
- public void Dispose()
- {
- lock (notifiers)
- {
- if (!isDisposed)
- {
- isDisposed = true;
- notifiers.Clear();
- }
- }
- }
- class Subscription<T> : IDisposable
- {
- readonly AsyncMessageBroker parent;
- readonly Func<T, IObservable<Unit>> asyncMessageReceiver;
- public Subscription(AsyncMessageBroker parent, Func<T, IObservable<Unit>> asyncMessageReceiver)
- {
- this.parent = parent;
- this.asyncMessageReceiver = asyncMessageReceiver;
- }
- public void Dispose()
- {
- lock (parent.notifiers)
- {
- object _notifier;
- if (parent.notifiers.TryGetValue(typeof(T), out _notifier))
- {
- var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
- notifier = notifier.Remove(asyncMessageReceiver);
- parent.notifiers[typeof(T)] = notifier;
- }
- }
- }
- }
- }
- }
|