Sample.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. namespace UniRx.Operators
  6. {
  7. internal class SampleObservable<T> : OperatorObservableBase<T>
  8. {
  9. readonly IObservable<T> source;
  10. readonly TimeSpan interval;
  11. readonly IScheduler scheduler;
  12. public SampleObservable(IObservable<T> source, TimeSpan interval, IScheduler scheduler)
  13. : base(source.IsRequiredSubscribeOnCurrentThread() || scheduler == Scheduler.CurrentThread)
  14. {
  15. this.source = source;
  16. this.interval = interval;
  17. this.scheduler = scheduler;
  18. }
  19. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  20. {
  21. return new Sample(this, observer, cancel).Run();
  22. }
  23. class Sample : OperatorObserverBase<T, T>
  24. {
  25. readonly SampleObservable<T> parent;
  26. readonly object gate = new object();
  27. T latestValue = default(T);
  28. bool isUpdated = false;
  29. bool isCompleted = false;
  30. SingleAssignmentDisposable sourceSubscription;
  31. public Sample(SampleObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  32. {
  33. this.parent = parent;
  34. }
  35. public IDisposable Run()
  36. {
  37. sourceSubscription = new SingleAssignmentDisposable();
  38. sourceSubscription.Disposable = parent.source.Subscribe(this);
  39. IDisposable scheduling;
  40. var periodicScheduler = parent.scheduler as ISchedulerPeriodic;
  41. if (periodicScheduler != null)
  42. {
  43. scheduling = periodicScheduler.SchedulePeriodic(parent.interval, OnNextTick);
  44. }
  45. else
  46. {
  47. scheduling = parent.scheduler.Schedule(parent.interval, OnNextRecursive);
  48. }
  49. return StableCompositeDisposable.Create(sourceSubscription, scheduling);
  50. }
  51. void OnNextTick()
  52. {
  53. lock (gate)
  54. {
  55. if (isUpdated)
  56. {
  57. var value = latestValue;
  58. isUpdated = false;
  59. observer.OnNext(value);
  60. }
  61. if (isCompleted)
  62. {
  63. try { observer.OnCompleted(); } finally { Dispose(); }
  64. }
  65. }
  66. }
  67. void OnNextRecursive(Action<TimeSpan> self)
  68. {
  69. lock (gate)
  70. {
  71. if (isUpdated)
  72. {
  73. var value = latestValue;
  74. isUpdated = false;
  75. observer.OnNext(value);
  76. }
  77. if (isCompleted)
  78. {
  79. try { observer.OnCompleted(); } finally { Dispose(); }
  80. }
  81. }
  82. self(parent.interval);
  83. }
  84. public override void OnNext(T value)
  85. {
  86. lock (gate)
  87. {
  88. latestValue = value;
  89. isUpdated = true;
  90. }
  91. }
  92. public override void OnError(Exception error)
  93. {
  94. lock (gate)
  95. {
  96. try { base.observer.OnError(error); } finally { Dispose(); }
  97. }
  98. }
  99. public override void OnCompleted()
  100. {
  101. lock (gate)
  102. {
  103. isCompleted = true;
  104. sourceSubscription.Dispose();
  105. }
  106. }
  107. }
  108. }
  109. internal class SampleObservable<T, T2> : OperatorObservableBase<T>
  110. {
  111. readonly IObservable<T> source;
  112. readonly IObservable<T2> intervalSource;
  113. public SampleObservable(IObservable<T> source, IObservable<T2> intervalSource)
  114. : base(source.IsRequiredSubscribeOnCurrentThread())
  115. {
  116. this.source = source;
  117. this.intervalSource = intervalSource;
  118. }
  119. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  120. {
  121. return new Sample(this, observer, cancel).Run();
  122. }
  123. class Sample : OperatorObserverBase<T, T>
  124. {
  125. readonly SampleObservable<T, T2> parent;
  126. readonly object gate = new object();
  127. T latestValue = default(T);
  128. bool isUpdated = false;
  129. bool isCompleted = false;
  130. SingleAssignmentDisposable sourceSubscription;
  131. public Sample(
  132. SampleObservable<T, T2> parent, IObserver<T> observer, IDisposable cancel)
  133. : base(observer, cancel)
  134. {
  135. this.parent = parent;
  136. }
  137. public IDisposable Run()
  138. {
  139. sourceSubscription = new SingleAssignmentDisposable();
  140. sourceSubscription.Disposable = parent.source.Subscribe(this);
  141. var scheduling = this.parent.intervalSource.Subscribe(new SampleTick(this));
  142. return StableCompositeDisposable.Create(sourceSubscription, scheduling);
  143. }
  144. public override void OnNext(T value)
  145. {
  146. lock (gate)
  147. {
  148. latestValue = value;
  149. isUpdated = true;
  150. }
  151. }
  152. public override void OnError(Exception error)
  153. {
  154. lock (gate)
  155. {
  156. try { base.observer.OnError(error); } finally { Dispose(); }
  157. }
  158. }
  159. public override void OnCompleted()
  160. {
  161. lock (gate)
  162. {
  163. isCompleted = true;
  164. sourceSubscription.Dispose();
  165. }
  166. }
  167. class SampleTick : IObserver<T2>
  168. {
  169. readonly Sample parent;
  170. public SampleTick(Sample parent)
  171. {
  172. this.parent = parent;
  173. }
  174. public void OnCompleted()
  175. {
  176. lock (parent.gate)
  177. {
  178. if (parent.isUpdated)
  179. {
  180. parent.isUpdated = false;
  181. parent.observer.OnNext(parent.latestValue);
  182. }
  183. if (parent.isCompleted)
  184. {
  185. try { parent.observer.OnCompleted(); } finally { parent.Dispose(); }
  186. }
  187. }
  188. }
  189. public void OnError(Exception error)
  190. {
  191. }
  192. public void OnNext(T2 _)
  193. {
  194. lock (parent.gate)
  195. {
  196. if (parent.isUpdated)
  197. {
  198. var value = parent.latestValue;
  199. parent.isUpdated = false;
  200. parent.observer.OnNext(value);
  201. }
  202. if (parent.isCompleted)
  203. {
  204. try { parent.observer.OnCompleted(); } finally { parent.Dispose(); }
  205. }
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }