Defer.cs 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class DeferObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly Func<IObservable<T>> observableFactory;
  7. public DeferObservable(Func<IObservable<T>> observableFactory)
  8. : base(false)
  9. {
  10. this.observableFactory = observableFactory;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  13. {
  14. observer = new Defer(observer, cancel);
  15. IObservable<T> source;
  16. try
  17. {
  18. source = observableFactory();
  19. }
  20. catch (Exception ex)
  21. {
  22. source = Observable.Throw<T>(ex);
  23. }
  24. return source.Subscribe(observer);
  25. }
  26. class Defer : OperatorObserverBase<T, T>
  27. {
  28. public Defer(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  29. {
  30. }
  31. public override void OnNext(T value)
  32. {
  33. try
  34. {
  35. base.observer.OnNext(value);
  36. }
  37. catch
  38. {
  39. Dispose();
  40. throw;
  41. }
  42. }
  43. public override void OnError(Exception error)
  44. {
  45. try { observer.OnError(error); }
  46. finally { Dispose(); }
  47. }
  48. public override void OnCompleted()
  49. {
  50. try { observer.OnCompleted(); }
  51. finally { Dispose(); }
  52. }
  53. }
  54. }
  55. }