ReplaySubject.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. using System;
  2. using System.Collections.Generic;
  3. using UniRx.InternalUtil;
  4. namespace UniRx
  5. {
  6. public sealed class ReplaySubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
  7. {
  8. object observerLock = new object();
  9. bool isStopped;
  10. bool isDisposed;
  11. Exception lastError;
  12. IObserver<T> outObserver = EmptyObserver<T>.Instance;
  13. readonly int bufferSize;
  14. readonly TimeSpan window;
  15. readonly DateTimeOffset startTime;
  16. readonly IScheduler scheduler;
  17. Queue<TimeInterval<T>> queue = new Queue<TimeInterval<T>>();
  18. public ReplaySubject()
  19. : this(int.MaxValue, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
  20. {
  21. }
  22. public ReplaySubject(IScheduler scheduler)
  23. : this(int.MaxValue, TimeSpan.MaxValue, scheduler)
  24. {
  25. }
  26. public ReplaySubject(int bufferSize)
  27. : this(bufferSize, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
  28. {
  29. }
  30. public ReplaySubject(int bufferSize, IScheduler scheduler)
  31. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  32. {
  33. }
  34. public ReplaySubject(TimeSpan window)
  35. : this(int.MaxValue, window, Scheduler.DefaultSchedulers.Iteration)
  36. {
  37. }
  38. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  39. : this(int.MaxValue, window, scheduler)
  40. {
  41. }
  42. // full constructor
  43. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  44. {
  45. if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize");
  46. if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window");
  47. if (scheduler == null) throw new ArgumentNullException("scheduler");
  48. this.bufferSize = bufferSize;
  49. this.window = window;
  50. this.scheduler = scheduler;
  51. startTime = scheduler.Now;
  52. }
  53. void Trim()
  54. {
  55. var elapsedTime = Scheduler.Normalize(scheduler.Now - startTime);
  56. while (queue.Count > bufferSize)
  57. {
  58. queue.Dequeue();
  59. }
  60. while (queue.Count > 0 && elapsedTime.Subtract(queue.Peek().Interval).CompareTo(window) > 0)
  61. {
  62. queue.Dequeue();
  63. }
  64. }
  65. public void OnCompleted()
  66. {
  67. IObserver<T> old;
  68. lock (observerLock)
  69. {
  70. ThrowIfDisposed();
  71. if (isStopped) return;
  72. old = outObserver;
  73. outObserver = EmptyObserver<T>.Instance;
  74. isStopped = true;
  75. Trim();
  76. }
  77. old.OnCompleted();
  78. }
  79. public void OnError(Exception error)
  80. {
  81. if (error == null) throw new ArgumentNullException("error");
  82. IObserver<T> old;
  83. lock (observerLock)
  84. {
  85. ThrowIfDisposed();
  86. if (isStopped) return;
  87. old = outObserver;
  88. outObserver = EmptyObserver<T>.Instance;
  89. isStopped = true;
  90. lastError = error;
  91. Trim();
  92. }
  93. old.OnError(error);
  94. }
  95. public void OnNext(T value)
  96. {
  97. IObserver<T> current;
  98. lock (observerLock)
  99. {
  100. ThrowIfDisposed();
  101. if (isStopped) return;
  102. // enQ
  103. queue.Enqueue(new TimeInterval<T>(value, scheduler.Now - startTime));
  104. Trim();
  105. current = outObserver;
  106. }
  107. current.OnNext(value);
  108. }
  109. public IDisposable Subscribe(IObserver<T> observer)
  110. {
  111. if (observer == null) throw new ArgumentNullException("observer");
  112. var ex = default(Exception);
  113. var subscription = default(Subscription);
  114. lock (observerLock)
  115. {
  116. ThrowIfDisposed();
  117. if (!isStopped)
  118. {
  119. var listObserver = outObserver as ListObserver<T>;
  120. if (listObserver != null)
  121. {
  122. outObserver = listObserver.Add(observer);
  123. }
  124. else
  125. {
  126. var current = outObserver;
  127. if (current is EmptyObserver<T>)
  128. {
  129. outObserver = observer;
  130. }
  131. else
  132. {
  133. outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
  134. }
  135. }
  136. subscription = new Subscription(this, observer);
  137. }
  138. ex = lastError;
  139. Trim();
  140. foreach (var item in queue)
  141. {
  142. observer.OnNext(item.Value);
  143. }
  144. }
  145. if (subscription != null)
  146. {
  147. return subscription;
  148. }
  149. else if (ex != null)
  150. {
  151. observer.OnError(ex);
  152. }
  153. else
  154. {
  155. observer.OnCompleted();
  156. }
  157. return Disposable.Empty;
  158. }
  159. public void Dispose()
  160. {
  161. lock (observerLock)
  162. {
  163. isDisposed = true;
  164. outObserver = DisposedObserver<T>.Instance;
  165. lastError = null;
  166. queue = null;
  167. }
  168. }
  169. void ThrowIfDisposed()
  170. {
  171. if (isDisposed) throw new ObjectDisposedException("");
  172. }
  173. public bool IsRequiredSubscribeOnCurrentThread()
  174. {
  175. return false;
  176. }
  177. class Subscription : IDisposable
  178. {
  179. readonly object gate = new object();
  180. ReplaySubject<T> parent;
  181. IObserver<T> unsubscribeTarget;
  182. public Subscription(ReplaySubject<T> parent, IObserver<T> unsubscribeTarget)
  183. {
  184. this.parent = parent;
  185. this.unsubscribeTarget = unsubscribeTarget;
  186. }
  187. public void Dispose()
  188. {
  189. lock (gate)
  190. {
  191. if (parent != null)
  192. {
  193. lock (parent.observerLock)
  194. {
  195. var listObserver = parent.outObserver as ListObserver<T>;
  196. if (listObserver != null)
  197. {
  198. parent.outObserver = listObserver.Remove(unsubscribeTarget);
  199. }
  200. else
  201. {
  202. parent.outObserver = EmptyObserver<T>.Instance;
  203. }
  204. unsubscribeTarget = null;
  205. parent = null;
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }
  212. }