1
0

Catch.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class CatchObservable<T, TException> : OperatorObservableBase<T>
  6. where TException : Exception
  7. {
  8. readonly IObservable<T> source;
  9. readonly Func<TException, IObservable<T>> errorHandler;
  10. public CatchObservable(IObservable<T> source, Func<TException, IObservable<T>> errorHandler)
  11. : base(source.IsRequiredSubscribeOnCurrentThread())
  12. {
  13. this.source = source;
  14. this.errorHandler = errorHandler;
  15. }
  16. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  17. {
  18. return new Catch(this, observer, cancel).Run();
  19. }
  20. class Catch : OperatorObserverBase<T, T>
  21. {
  22. readonly CatchObservable<T, TException> parent;
  23. SingleAssignmentDisposable sourceSubscription;
  24. SingleAssignmentDisposable exceptionSubscription;
  25. public Catch(CatchObservable<T, TException> parent, IObserver<T> observer, IDisposable cancel)
  26. : base(observer, cancel)
  27. {
  28. this.parent = parent;
  29. }
  30. public IDisposable Run()
  31. {
  32. this.sourceSubscription = new SingleAssignmentDisposable();
  33. this.exceptionSubscription = new SingleAssignmentDisposable();
  34. this.sourceSubscription.Disposable = parent.source.Subscribe(this);
  35. return StableCompositeDisposable.Create(sourceSubscription, exceptionSubscription);
  36. }
  37. public override void OnNext(T value)
  38. {
  39. base.observer.OnNext(value);
  40. }
  41. public override void OnError(Exception error)
  42. {
  43. var e = error as TException;
  44. if (e != null)
  45. {
  46. IObservable<T> next;
  47. try
  48. {
  49. if (parent.errorHandler == Stubs.CatchIgnore<T>)
  50. {
  51. next = Observable.Empty<T>(); // for avoid iOS AOT
  52. }
  53. else
  54. {
  55. next = parent.errorHandler(e);
  56. }
  57. }
  58. catch (Exception ex)
  59. {
  60. try { observer.OnError(ex); } finally { Dispose(); };
  61. return;
  62. }
  63. exceptionSubscription.Disposable = next.Subscribe(observer);
  64. }
  65. else
  66. {
  67. try { observer.OnError(error); } finally { Dispose(); };
  68. return;
  69. }
  70. }
  71. public override void OnCompleted()
  72. {
  73. try { observer.OnCompleted(); } finally { Dispose(); };
  74. }
  75. }
  76. }
  77. internal class CatchObservable<T> : OperatorObservableBase<T>
  78. {
  79. readonly IEnumerable<IObservable<T>> sources;
  80. public CatchObservable(IEnumerable<IObservable<T>> sources)
  81. : base(true)
  82. {
  83. this.sources = sources;
  84. }
  85. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  86. {
  87. return new Catch(this, observer, cancel).Run();
  88. }
  89. class Catch : OperatorObserverBase<T, T>
  90. {
  91. readonly CatchObservable<T> parent;
  92. readonly object gate = new object();
  93. bool isDisposed;
  94. IEnumerator<IObservable<T>> e;
  95. SerialDisposable subscription;
  96. Exception lastException;
  97. Action nextSelf;
  98. public Catch(CatchObservable<T> parent, IObserver<T> observer, IDisposable cancel)
  99. : base(observer, cancel)
  100. {
  101. this.parent = parent;
  102. }
  103. public IDisposable Run()
  104. {
  105. isDisposed = false;
  106. e = parent.sources.GetEnumerator();
  107. subscription = new SerialDisposable();
  108. var schedule = Scheduler.DefaultSchedulers.TailRecursion.Schedule(RecursiveRun);
  109. return StableCompositeDisposable.Create(schedule, subscription, Disposable.Create(() =>
  110. {
  111. lock (gate)
  112. {
  113. this.isDisposed = true;
  114. this.e.Dispose();
  115. }
  116. }));
  117. }
  118. void RecursiveRun(Action self)
  119. {
  120. lock (gate)
  121. {
  122. nextSelf = self;
  123. if (isDisposed) return;
  124. var current = default(IObservable<T>);
  125. var hasNext = false;
  126. var ex = default(Exception);
  127. try
  128. {
  129. hasNext = e.MoveNext();
  130. if (hasNext)
  131. {
  132. current = e.Current;
  133. if (current == null) throw new InvalidOperationException("sequence is null.");
  134. }
  135. else
  136. {
  137. e.Dispose();
  138. }
  139. }
  140. catch (Exception exception)
  141. {
  142. ex = exception;
  143. e.Dispose();
  144. }
  145. if (ex != null)
  146. {
  147. try { observer.OnError(ex); }
  148. finally { Dispose(); }
  149. return;
  150. }
  151. if (!hasNext)
  152. {
  153. if (lastException != null)
  154. {
  155. try { observer.OnError(lastException); }
  156. finally { Dispose(); }
  157. }
  158. else
  159. {
  160. try { observer.OnCompleted(); }
  161. finally { Dispose(); }
  162. }
  163. return;
  164. }
  165. var source = current;
  166. var d = new SingleAssignmentDisposable();
  167. subscription.Disposable = d;
  168. d.Disposable = source.Subscribe(this);
  169. }
  170. }
  171. public override void OnNext(T value)
  172. {
  173. base.observer.OnNext(value);
  174. }
  175. public override void OnError(Exception error)
  176. {
  177. lastException = error;
  178. nextSelf();
  179. }
  180. public override void OnCompleted()
  181. {
  182. try { observer.OnCompleted(); }
  183. finally { Dispose(); }
  184. return;
  185. }
  186. }
  187. }
  188. }