DelayFrame.cs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. namespace UniRx.Operators
  5. {
  6. internal class DelayFrameObservable<T> : OperatorObservableBase<T>
  7. {
  8. readonly IObservable<T> source;
  9. readonly int frameCount;
  10. readonly FrameCountType frameCountType;
  11. public DelayFrameObservable(IObservable<T> source, int frameCount, FrameCountType frameCountType)
  12. : base(source.IsRequiredSubscribeOnCurrentThread())
  13. {
  14. this.source = source;
  15. this.frameCount = frameCount;
  16. this.frameCountType = frameCountType;
  17. }
  18. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  19. {
  20. return new DelayFrame(this, observer, cancel).Run();
  21. }
  22. class DelayFrame : OperatorObserverBase<T, T>
  23. {
  24. readonly DelayFrameObservable<T> parent;
  25. readonly object gate = new object();
  26. readonly QueuePool pool = new QueuePool();
  27. int runningEnumeratorCount;
  28. bool readyDrainEnumerator;
  29. bool running;
  30. IDisposable sourceSubscription;
  31. Queue<T> currentQueueReference;
  32. bool calledCompleted;
  33. bool hasError;
  34. Exception error;
  35. BooleanDisposable cancelationToken;
  36. public DelayFrame(DelayFrameObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  37. {
  38. this.parent = parent;
  39. }
  40. public IDisposable Run()
  41. {
  42. cancelationToken = new BooleanDisposable();
  43. var _sourceSubscription = new SingleAssignmentDisposable();
  44. sourceSubscription = _sourceSubscription;
  45. _sourceSubscription.Disposable = parent.source.Subscribe(this);
  46. return StableCompositeDisposable.Create(cancelationToken, sourceSubscription);
  47. }
  48. IEnumerator DrainQueue(Queue<T> q, int frameCount)
  49. {
  50. lock (gate)
  51. {
  52. readyDrainEnumerator = false; // use next queue.
  53. running = false;
  54. }
  55. while (!cancelationToken.IsDisposed && frameCount-- != 0)
  56. {
  57. yield return null;
  58. }
  59. try
  60. {
  61. if (q != null)
  62. {
  63. while (q.Count > 0 && !hasError)
  64. {
  65. if (cancelationToken.IsDisposed) break;
  66. lock (gate)
  67. {
  68. running = true;
  69. }
  70. var value = q.Dequeue();
  71. observer.OnNext(value);
  72. lock (gate)
  73. {
  74. running = false;
  75. }
  76. }
  77. if (q.Count == 0)
  78. {
  79. pool.Return(q);
  80. }
  81. }
  82. if (hasError)
  83. {
  84. if (!cancelationToken.IsDisposed)
  85. {
  86. cancelationToken.Dispose();
  87. try { observer.OnError(error); } finally { Dispose(); }
  88. }
  89. }
  90. else if (calledCompleted)
  91. {
  92. lock (gate)
  93. {
  94. // not self only
  95. if (runningEnumeratorCount != 1) yield break;
  96. }
  97. if (!cancelationToken.IsDisposed)
  98. {
  99. cancelationToken.Dispose();
  100. try { observer.OnCompleted(); }
  101. finally { Dispose(); }
  102. }
  103. }
  104. }
  105. finally
  106. {
  107. lock (gate)
  108. {
  109. runningEnumeratorCount--;
  110. }
  111. }
  112. }
  113. public override void OnNext(T value)
  114. {
  115. if (cancelationToken.IsDisposed) return;
  116. Queue<T> targetQueue = null;
  117. lock (gate)
  118. {
  119. if (!readyDrainEnumerator)
  120. {
  121. readyDrainEnumerator = true;
  122. runningEnumeratorCount++;
  123. targetQueue = currentQueueReference = pool.Get();
  124. targetQueue.Enqueue(value);
  125. }
  126. else
  127. {
  128. if (currentQueueReference != null) // null - if doesn't start OnNext and start OnCompleted
  129. {
  130. currentQueueReference.Enqueue(value);
  131. }
  132. return;
  133. }
  134. }
  135. switch (parent.frameCountType)
  136. {
  137. case FrameCountType.Update:
  138. MainThreadDispatcher.StartUpdateMicroCoroutine(DrainQueue(targetQueue, parent.frameCount));
  139. break;
  140. case FrameCountType.FixedUpdate:
  141. MainThreadDispatcher.StartFixedUpdateMicroCoroutine(DrainQueue(targetQueue, parent.frameCount));
  142. break;
  143. case FrameCountType.EndOfFrame:
  144. MainThreadDispatcher.StartEndOfFrameMicroCoroutine(DrainQueue(targetQueue, parent.frameCount));
  145. break;
  146. default:
  147. throw new ArgumentException("Invalid FrameCountType:" + parent.frameCountType);
  148. }
  149. }
  150. public override void OnError(Exception error)
  151. {
  152. sourceSubscription.Dispose(); // stop subscription
  153. if (cancelationToken.IsDisposed) return;
  154. lock (gate)
  155. {
  156. if (running)
  157. {
  158. hasError = true;
  159. this.error = error;
  160. return;
  161. }
  162. }
  163. cancelationToken.Dispose();
  164. try { base.observer.OnError(error); } finally { Dispose(); }
  165. }
  166. public override void OnCompleted()
  167. {
  168. sourceSubscription.Dispose(); // stop subscription
  169. if (cancelationToken.IsDisposed) return;
  170. lock (gate)
  171. {
  172. calledCompleted = true;
  173. if (!readyDrainEnumerator)
  174. {
  175. readyDrainEnumerator = true;
  176. runningEnumeratorCount++;
  177. }
  178. else
  179. {
  180. return;
  181. }
  182. }
  183. switch (parent.frameCountType)
  184. {
  185. case FrameCountType.Update:
  186. MainThreadDispatcher.StartUpdateMicroCoroutine(DrainQueue(null, parent.frameCount));
  187. break;
  188. case FrameCountType.FixedUpdate:
  189. MainThreadDispatcher.StartFixedUpdateMicroCoroutine(DrainQueue(null, parent.frameCount));
  190. break;
  191. case FrameCountType.EndOfFrame:
  192. MainThreadDispatcher.StartEndOfFrameMicroCoroutine(DrainQueue(null, parent.frameCount));
  193. break;
  194. default:
  195. throw new ArgumentException("Invalid FrameCountType:" + parent.frameCountType);
  196. }
  197. }
  198. }
  199. class QueuePool
  200. {
  201. readonly object gate = new object();
  202. readonly Queue<Queue<T>> pool = new Queue<Queue<T>>(2);
  203. public Queue<T> Get()
  204. {
  205. lock (gate)
  206. {
  207. if (pool.Count == 0)
  208. {
  209. return new Queue<T>(2);
  210. }
  211. else
  212. {
  213. return pool.Dequeue();
  214. }
  215. }
  216. }
  217. public void Return(Queue<T> q)
  218. {
  219. lock (gate)
  220. {
  221. pool.Enqueue(q);
  222. }
  223. }
  224. }
  225. }
  226. }