Single.cs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class SingleObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly bool useDefault;
  8. readonly Func<T, bool> predicate;
  9. public SingleObservable(IObservable<T> source, bool useDefault)
  10. : base(source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.useDefault = useDefault;
  14. }
  15. public SingleObservable(IObservable<T> source, Func<T, bool> predicate, bool useDefault)
  16. : base(source.IsRequiredSubscribeOnCurrentThread())
  17. {
  18. this.source = source;
  19. this.predicate = predicate;
  20. this.useDefault = useDefault;
  21. }
  22. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  23. {
  24. if (predicate == null)
  25. {
  26. return source.Subscribe(new Single(this, observer, cancel));
  27. }
  28. else
  29. {
  30. return source.Subscribe(new Single_(this, observer, cancel));
  31. }
  32. }
  33. class Single : OperatorObserverBase<T, T>
  34. {
  35. readonly SingleObservable<T> parent;
  36. bool seenValue;
  37. T lastValue;
  38. public Single(SingleObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  39. {
  40. this.parent = parent;
  41. this.seenValue = false;
  42. }
  43. public override void OnNext(T value)
  44. {
  45. if (seenValue)
  46. {
  47. try { observer.OnError(new InvalidOperationException("sequence is not single")); }
  48. finally { Dispose(); }
  49. }
  50. else
  51. {
  52. seenValue = true;
  53. lastValue = value;
  54. }
  55. }
  56. public override void OnError(Exception error)
  57. {
  58. try { observer.OnError(error); }
  59. finally { Dispose(); }
  60. }
  61. public override void OnCompleted()
  62. {
  63. if (parent.useDefault)
  64. {
  65. if (!seenValue)
  66. {
  67. observer.OnNext(default(T));
  68. }
  69. else
  70. {
  71. observer.OnNext(lastValue);
  72. }
  73. try { observer.OnCompleted(); }
  74. finally { Dispose(); }
  75. }
  76. else
  77. {
  78. if (!seenValue)
  79. {
  80. try { observer.OnError(new InvalidOperationException("sequence is empty")); }
  81. finally { Dispose(); }
  82. }
  83. else
  84. {
  85. observer.OnNext(lastValue);
  86. try { observer.OnCompleted(); }
  87. finally { Dispose(); }
  88. }
  89. }
  90. }
  91. }
  92. class Single_ : OperatorObserverBase<T, T>
  93. {
  94. readonly SingleObservable<T> parent;
  95. bool seenValue;
  96. T lastValue;
  97. public Single_(SingleObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  98. {
  99. this.parent = parent;
  100. this.seenValue = false;
  101. }
  102. public override void OnNext(T value)
  103. {
  104. bool isPassed;
  105. try
  106. {
  107. isPassed = parent.predicate(value);
  108. }
  109. catch (Exception ex)
  110. {
  111. try { observer.OnError(ex); }
  112. finally { Dispose(); }
  113. return;
  114. }
  115. if (isPassed)
  116. {
  117. if (seenValue)
  118. {
  119. try { observer.OnError(new InvalidOperationException("sequence is not single")); }
  120. finally { Dispose(); }
  121. return;
  122. }
  123. else
  124. {
  125. seenValue = true;
  126. lastValue = value;
  127. }
  128. }
  129. }
  130. public override void OnError(Exception error)
  131. {
  132. try { observer.OnError(error); }
  133. finally { Dispose(); }
  134. }
  135. public override void OnCompleted()
  136. {
  137. if (parent.useDefault)
  138. {
  139. if (!seenValue)
  140. {
  141. observer.OnNext(default(T));
  142. }
  143. else
  144. {
  145. observer.OnNext(lastValue);
  146. }
  147. try { observer.OnCompleted(); }
  148. finally { Dispose(); }
  149. }
  150. else
  151. {
  152. if (!seenValue)
  153. {
  154. try { observer.OnError(new InvalidOperationException("sequence is empty")); }
  155. finally { Dispose(); }
  156. }
  157. else
  158. {
  159. observer.OnNext(lastValue);
  160. try { observer.OnCompleted(); }
  161. finally { Dispose(); }
  162. }
  163. }
  164. }
  165. }
  166. }
  167. }