Throw.cs 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class ThrowObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly Exception error;
  7. readonly IScheduler scheduler;
  8. public ThrowObservable(Exception error, IScheduler scheduler)
  9. : base(scheduler == Scheduler.CurrentThread)
  10. {
  11. this.error = error;
  12. this.scheduler = scheduler;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  15. {
  16. observer = new Throw(observer, cancel);
  17. if (scheduler == Scheduler.Immediate)
  18. {
  19. observer.OnError(error);
  20. return Disposable.Empty;
  21. }
  22. else
  23. {
  24. return scheduler.Schedule(() =>
  25. {
  26. observer.OnError(error);
  27. observer.OnCompleted();
  28. });
  29. }
  30. }
  31. class Throw : OperatorObserverBase<T, T>
  32. {
  33. public Throw(IObserver<T> observer, IDisposable cancel)
  34. : base(observer, cancel)
  35. {
  36. }
  37. public override void OnNext(T value)
  38. {
  39. try
  40. {
  41. base.observer.OnNext(value);
  42. }
  43. catch
  44. {
  45. Dispose();
  46. throw;
  47. }
  48. }
  49. public override void OnError(Exception error)
  50. {
  51. try { observer.OnError(error); }
  52. finally { Dispose(); }
  53. }
  54. public override void OnCompleted()
  55. {
  56. try { observer.OnCompleted(); }
  57. finally { Dispose(); }
  58. }
  59. }
  60. }
  61. }