123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- using System;
- using System.Collections.Generic;
- using System.Text;
- using UniRx.InternalUtil;
- namespace UniRx
- {
- public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>
- {
- object observerLock = new object();
- bool isStopped;
- bool isDisposed;
- Exception lastError;
- IObserver<T> outObserver = EmptyObserver<T>.Instance;
- public bool HasObservers
- {
- get
- {
- return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed;
- }
- }
- public void OnCompleted()
- {
- IObserver<T> old;
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (isStopped) return;
- old = outObserver;
- outObserver = EmptyObserver<T>.Instance;
- isStopped = true;
- }
- old.OnCompleted();
- }
- public void OnError(Exception error)
- {
- if (error == null) throw new ArgumentNullException("error");
- IObserver<T> old;
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (isStopped) return;
- old = outObserver;
- outObserver = EmptyObserver<T>.Instance;
- isStopped = true;
- lastError = error;
- }
- old.OnError(error);
- }
- public void OnNext(T value)
- {
- outObserver.OnNext(value);
- }
- public IDisposable Subscribe(IObserver<T> observer)
- {
- if (observer == null) throw new ArgumentNullException("observer");
- var ex = default(Exception);
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (!isStopped)
- {
- var listObserver = outObserver as ListObserver<T>;
- if (listObserver != null)
- {
- outObserver = listObserver.Add(observer);
- }
- else
- {
- var current = outObserver;
- if (current is EmptyObserver<T>)
- {
- outObserver = observer;
- }
- else
- {
- outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
- }
- }
- return new Subscription(this, observer);
- }
- ex = lastError;
- }
- if (ex != null)
- {
- observer.OnError(ex);
- }
- else
- {
- observer.OnCompleted();
- }
- return Disposable.Empty;
- }
- public void Dispose()
- {
- lock (observerLock)
- {
- isDisposed = true;
- outObserver = DisposedObserver<T>.Instance;
- }
- }
- void ThrowIfDisposed()
- {
- if (isDisposed) throw new ObjectDisposedException("");
- }
- public bool IsRequiredSubscribeOnCurrentThread()
- {
- return false;
- }
- class Subscription : IDisposable
- {
- readonly object gate = new object();
- Subject<T> parent;
- IObserver<T> unsubscribeTarget;
- public Subscription(Subject<T> parent, IObserver<T> unsubscribeTarget)
- {
- this.parent = parent;
- this.unsubscribeTarget = unsubscribeTarget;
- }
- public void Dispose()
- {
- lock (gate)
- {
- if (parent != null)
- {
- lock (parent.observerLock)
- {
- var listObserver = parent.outObserver as ListObserver<T>;
- if (listObserver != null)
- {
- parent.outObserver = listObserver.Remove(unsubscribeTarget);
- }
- else
- {
- parent.outObserver = EmptyObserver<T>.Instance;
- }
- unsubscribeTarget = null;
- parent = null;
- }
- }
- }
- }
- }
- }
- }
|