123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- using System;
- using System.Collections.Generic;
- using UniRx.InternalUtil;
- namespace UniRx
- {
- public sealed class ReplaySubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
- {
- object observerLock = new object();
- bool isStopped;
- bool isDisposed;
- Exception lastError;
- IObserver<T> outObserver = EmptyObserver<T>.Instance;
- readonly int bufferSize;
- readonly TimeSpan window;
- readonly DateTimeOffset startTime;
- readonly IScheduler scheduler;
- Queue<TimeInterval<T>> queue = new Queue<TimeInterval<T>>();
- public ReplaySubject()
- : this(int.MaxValue, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
- {
- }
- public ReplaySubject(IScheduler scheduler)
- : this(int.MaxValue, TimeSpan.MaxValue, scheduler)
- {
- }
- public ReplaySubject(int bufferSize)
- : this(bufferSize, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
- {
- }
- public ReplaySubject(int bufferSize, IScheduler scheduler)
- : this(bufferSize, TimeSpan.MaxValue, scheduler)
- {
- }
- public ReplaySubject(TimeSpan window)
- : this(int.MaxValue, window, Scheduler.DefaultSchedulers.Iteration)
- {
- }
- public ReplaySubject(TimeSpan window, IScheduler scheduler)
- : this(int.MaxValue, window, scheduler)
- {
- }
- // full constructor
- public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
- {
- if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize");
- if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window");
- if (scheduler == null) throw new ArgumentNullException("scheduler");
- this.bufferSize = bufferSize;
- this.window = window;
- this.scheduler = scheduler;
- startTime = scheduler.Now;
- }
- void Trim()
- {
- var elapsedTime = Scheduler.Normalize(scheduler.Now - startTime);
- while (queue.Count > bufferSize)
- {
- queue.Dequeue();
- }
- while (queue.Count > 0 && elapsedTime.Subtract(queue.Peek().Interval).CompareTo(window) > 0)
- {
- queue.Dequeue();
- }
- }
- public void OnCompleted()
- {
- IObserver<T> old;
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (isStopped) return;
- old = outObserver;
- outObserver = EmptyObserver<T>.Instance;
- isStopped = true;
- Trim();
- }
- old.OnCompleted();
- }
- public void OnError(Exception error)
- {
- if (error == null) throw new ArgumentNullException("error");
- IObserver<T> old;
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (isStopped) return;
- old = outObserver;
- outObserver = EmptyObserver<T>.Instance;
- isStopped = true;
- lastError = error;
- Trim();
- }
- old.OnError(error);
- }
- public void OnNext(T value)
- {
- IObserver<T> current;
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (isStopped) return;
- // enQ
- queue.Enqueue(new TimeInterval<T>(value, scheduler.Now - startTime));
- Trim();
- current = outObserver;
- }
- current.OnNext(value);
- }
- public IDisposable Subscribe(IObserver<T> observer)
- {
- if (observer == null) throw new ArgumentNullException("observer");
- var ex = default(Exception);
- var subscription = default(Subscription);
- lock (observerLock)
- {
- ThrowIfDisposed();
- if (!isStopped)
- {
- var listObserver = outObserver as ListObserver<T>;
- if (listObserver != null)
- {
- outObserver = listObserver.Add(observer);
- }
- else
- {
- var current = outObserver;
- if (current is EmptyObserver<T>)
- {
- outObserver = observer;
- }
- else
- {
- outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
- }
- }
- subscription = new Subscription(this, observer);
- }
- ex = lastError;
- Trim();
- foreach (var item in queue)
- {
- observer.OnNext(item.Value);
- }
- }
- if (subscription != null)
- {
- return subscription;
- }
- else if (ex != null)
- {
- observer.OnError(ex);
- }
- else
- {
- observer.OnCompleted();
- }
- return Disposable.Empty;
- }
- public void Dispose()
- {
- lock (observerLock)
- {
- isDisposed = true;
- outObserver = DisposedObserver<T>.Instance;
- lastError = null;
- queue = null;
- }
- }
- void ThrowIfDisposed()
- {
- if (isDisposed) throw new ObjectDisposedException("");
- }
- public bool IsRequiredSubscribeOnCurrentThread()
- {
- return false;
- }
- class Subscription : IDisposable
- {
- readonly object gate = new object();
- ReplaySubject<T> parent;
- IObserver<T> unsubscribeTarget;
- public Subscription(ReplaySubject<T> parent, IObserver<T> unsubscribeTarget)
- {
- this.parent = parent;
- this.unsubscribeTarget = unsubscribeTarget;
- }
- public void Dispose()
- {
- lock (gate)
- {
- if (parent != null)
- {
- lock (parent.observerLock)
- {
- var listObserver = parent.outObserver as ListObserver<T>;
- if (listObserver != null)
- {
- parent.outObserver = listObserver.Remove(unsubscribeTarget);
- }
- else
- {
- parent.outObserver = EmptyObserver<T>.Instance;
- }
- unsubscribeTarget = null;
- parent = null;
- }
- }
- }
- }
- }
- }
- }
|