Finally.cs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class FinallyObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly Action finallyAction;
  8. public FinallyObservable(IObservable<T> source, Action finallyAction)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.finallyAction = finallyAction;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  15. {
  16. return new Finally(this, observer, cancel).Run();
  17. }
  18. class Finally : OperatorObserverBase<T, T>
  19. {
  20. readonly FinallyObservable<T> parent;
  21. public Finally(FinallyObservable<T> parent, IObserver<T> observer, IDisposable cancel)
  22. : base(observer, cancel)
  23. {
  24. this.parent = parent;
  25. }
  26. public IDisposable Run()
  27. {
  28. IDisposable subscription;
  29. try
  30. {
  31. subscription = parent.source.Subscribe(this);
  32. }
  33. catch
  34. {
  35. // This behaviour is not same as .NET Official Rx
  36. parent.finallyAction();
  37. throw;
  38. }
  39. return StableCompositeDisposable.Create(subscription, Disposable.Create(() =>
  40. {
  41. parent.finallyAction();
  42. }));
  43. }
  44. public override void OnNext(T value)
  45. {
  46. base.observer.OnNext(value);
  47. }
  48. public override void OnError(Exception error)
  49. {
  50. try { observer.OnError(error); } finally { Dispose(); };
  51. }
  52. public override void OnCompleted()
  53. {
  54. try { observer.OnCompleted(); } finally { Dispose(); };
  55. }
  56. }
  57. }
  58. }