Aggregate.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class AggregateObservable<TSource> : OperatorObservableBase<TSource>
  6. {
  7. readonly IObservable<TSource> source;
  8. readonly Func<TSource, TSource, TSource> accumulator;
  9. public AggregateObservable(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  10. : base(source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.accumulator = accumulator;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<TSource> observer, IDisposable cancel)
  16. {
  17. return source.Subscribe(new Aggregate(this, observer, cancel));
  18. }
  19. class Aggregate : OperatorObserverBase<TSource, TSource>
  20. {
  21. readonly AggregateObservable<TSource> parent;
  22. TSource accumulation;
  23. bool seenValue;
  24. public Aggregate(AggregateObservable<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel)
  25. {
  26. this.parent = parent;
  27. this.seenValue = false;
  28. }
  29. public override void OnNext(TSource value)
  30. {
  31. if (!seenValue)
  32. {
  33. seenValue = true;
  34. accumulation = value;
  35. }
  36. else
  37. {
  38. try
  39. {
  40. accumulation = parent.accumulator(accumulation, value);
  41. }
  42. catch (Exception ex)
  43. {
  44. try { observer.OnError(ex); }
  45. finally { Dispose(); }
  46. return;
  47. }
  48. }
  49. }
  50. public override void OnError(Exception error)
  51. {
  52. try { observer.OnError(error); }
  53. finally { Dispose(); }
  54. }
  55. public override void OnCompleted()
  56. {
  57. if (!seenValue)
  58. {
  59. throw new InvalidOperationException("Sequence contains no elements.");
  60. }
  61. observer.OnNext(accumulation);
  62. try { observer.OnCompleted(); }
  63. finally { Dispose(); }
  64. }
  65. }
  66. }
  67. internal class AggregateObservable<TSource, TAccumulate> : OperatorObservableBase<TAccumulate>
  68. {
  69. readonly IObservable<TSource> source;
  70. readonly TAccumulate seed;
  71. readonly Func<TAccumulate, TSource, TAccumulate> accumulator;
  72. public AggregateObservable(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  73. : base(source.IsRequiredSubscribeOnCurrentThread())
  74. {
  75. this.source = source;
  76. this.seed = seed;
  77. this.accumulator = accumulator;
  78. }
  79. protected override IDisposable SubscribeCore(IObserver<TAccumulate> observer, IDisposable cancel)
  80. {
  81. return source.Subscribe(new Aggregate(this, observer, cancel));
  82. }
  83. class Aggregate : OperatorObserverBase<TSource, TAccumulate>
  84. {
  85. readonly AggregateObservable<TSource, TAccumulate> parent;
  86. TAccumulate accumulation;
  87. public Aggregate(AggregateObservable<TSource, TAccumulate> parent, IObserver<TAccumulate> observer, IDisposable cancel) : base(observer, cancel)
  88. {
  89. this.parent = parent;
  90. this.accumulation = parent.seed;
  91. }
  92. public override void OnNext(TSource value)
  93. {
  94. try
  95. {
  96. accumulation = parent.accumulator(accumulation, value);
  97. }
  98. catch (Exception ex)
  99. {
  100. try { observer.OnError(ex); }
  101. finally { Dispose(); }
  102. return;
  103. }
  104. }
  105. public override void OnError(Exception error)
  106. {
  107. try { observer.OnError(error); }
  108. finally { Dispose(); }
  109. }
  110. public override void OnCompleted()
  111. {
  112. observer.OnNext(accumulation);
  113. try { observer.OnCompleted(); }
  114. finally { Dispose(); }
  115. }
  116. }
  117. }
  118. internal class AggregateObservable<TSource, TAccumulate, TResult> : OperatorObservableBase<TResult>
  119. {
  120. readonly IObservable<TSource> source;
  121. readonly TAccumulate seed;
  122. readonly Func<TAccumulate, TSource, TAccumulate> accumulator;
  123. readonly Func<TAccumulate, TResult> resultSelector;
  124. public AggregateObservable(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
  125. : base(source.IsRequiredSubscribeOnCurrentThread())
  126. {
  127. this.source = source;
  128. this.seed = seed;
  129. this.accumulator = accumulator;
  130. this.resultSelector = resultSelector;
  131. }
  132. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  133. {
  134. return source.Subscribe(new Aggregate(this, observer, cancel));
  135. }
  136. class Aggregate : OperatorObserverBase<TSource, TResult>
  137. {
  138. readonly AggregateObservable<TSource, TAccumulate, TResult> parent;
  139. TAccumulate accumulation;
  140. public Aggregate(AggregateObservable<TSource, TAccumulate, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel)
  141. {
  142. this.parent = parent;
  143. this.accumulation = parent.seed;
  144. }
  145. public override void OnNext(TSource value)
  146. {
  147. try
  148. {
  149. accumulation = parent.accumulator(accumulation, value);
  150. }
  151. catch (Exception ex)
  152. {
  153. try { observer.OnError(ex); }
  154. finally { Dispose(); }
  155. return;
  156. }
  157. }
  158. public override void OnError(Exception error)
  159. {
  160. try { observer.OnError(error); }
  161. finally { Dispose(); }
  162. }
  163. public override void OnCompleted()
  164. {
  165. TResult result;
  166. try
  167. {
  168. result = parent.resultSelector(accumulation);
  169. }
  170. catch (Exception ex)
  171. {
  172. OnError(ex);
  173. return;
  174. }
  175. observer.OnNext(result);
  176. try { observer.OnCompleted(); }
  177. finally { Dispose(); }
  178. }
  179. }
  180. }
  181. }