Dematerialize.cs 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class DematerializeObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<Notification<T>> source;
  7. public DematerializeObservable(IObservable<Notification<T>> source)
  8. : base(source.IsRequiredSubscribeOnCurrentThread())
  9. {
  10. this.source = source;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  13. {
  14. return new Dematerialize(this, observer, cancel).Run();
  15. }
  16. class Dematerialize : OperatorObserverBase<Notification<T>, T>
  17. {
  18. readonly DematerializeObservable<T> parent;
  19. public Dematerialize(DematerializeObservable<T> parent, IObserver<T> observer, IDisposable cancel)
  20. : base(observer, cancel)
  21. {
  22. this.parent = parent;
  23. }
  24. public IDisposable Run()
  25. {
  26. return parent.source.Subscribe(this);
  27. }
  28. public override void OnNext(Notification<T> value)
  29. {
  30. switch (value.Kind)
  31. {
  32. case NotificationKind.OnNext:
  33. observer.OnNext(value.Value);
  34. break;
  35. case NotificationKind.OnError:
  36. try { observer.OnError(value.Exception); }
  37. finally { Dispose(); }
  38. break;
  39. case NotificationKind.OnCompleted:
  40. try { observer.OnCompleted(); }
  41. finally { Dispose(); }
  42. break;
  43. default:
  44. break;
  45. }
  46. }
  47. public override void OnError(Exception error)
  48. {
  49. try { observer.OnError(error); } finally { Dispose(); }
  50. }
  51. public override void OnCompleted()
  52. {
  53. try { observer.OnCompleted(); } finally { Dispose(); }
  54. }
  55. }
  56. }
  57. }