using System; using System.Collections.Generic; using System.Text; using UniRx.InternalUtil; namespace UniRx { public sealed class Subject : ISubject, IDisposable, IOptimizedObservable { object observerLock = new object(); bool isStopped; bool isDisposed; Exception lastError; IObserver outObserver = EmptyObserver.Instance; public bool HasObservers { get { return !(outObserver is EmptyObserver) && !isStopped && !isDisposed; } } public void OnCompleted() { IObserver old; lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; old = outObserver; outObserver = EmptyObserver.Instance; isStopped = true; } old.OnCompleted(); } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); IObserver old; lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; old = outObserver; outObserver = EmptyObserver.Instance; isStopped = true; lastError = error; } old.OnError(error); } public void OnNext(T value) { outObserver.OnNext(value); } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var ex = default(Exception); lock (observerLock) { ThrowIfDisposed(); if (!isStopped) { var listObserver = outObserver as ListObserver; if (listObserver != null) { outObserver = listObserver.Add(observer); } else { var current = outObserver; if (current is EmptyObserver) { outObserver = observer; } else { outObserver = new ListObserver(new ImmutableList>(new[] { current, observer })); } } return new Subscription(this, observer); } ex = lastError; } if (ex != null) { observer.OnError(ex); } else { observer.OnCompleted(); } return Disposable.Empty; } public void Dispose() { lock (observerLock) { isDisposed = true; outObserver = DisposedObserver.Instance; } } void ThrowIfDisposed() { if (isDisposed) throw new ObjectDisposedException(""); } public bool IsRequiredSubscribeOnCurrentThread() { return false; } class Subscription : IDisposable { readonly object gate = new object(); Subject parent; IObserver unsubscribeTarget; public Subscription(Subject parent, IObserver unsubscribeTarget) { this.parent = parent; this.unsubscribeTarget = unsubscribeTarget; } public void Dispose() { lock (gate) { if (parent != null) { lock (parent.observerLock) { var listObserver = parent.outObserver as ListObserver; if (listObserver != null) { parent.outObserver = listObserver.Remove(unsubscribeTarget); } else { parent.outObserver = EmptyObserver.Instance; } unsubscribeTarget = null; parent = null; } } } } } } }