Materialize.cs 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class MaterializeObservable<T> : OperatorObservableBase<Notification<T>>
  5. {
  6. readonly IObservable<T> source;
  7. public MaterializeObservable(IObservable<T> source)
  8. : base(source.IsRequiredSubscribeOnCurrentThread())
  9. {
  10. this.source = source;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<Notification<T>> observer, IDisposable cancel)
  13. {
  14. return new Materialize(this, observer, cancel).Run();
  15. }
  16. class Materialize : OperatorObserverBase<T, Notification<T>>
  17. {
  18. readonly MaterializeObservable<T> parent;
  19. public Materialize(MaterializeObservable<T> parent, IObserver<Notification<T>> observer, IDisposable cancel)
  20. : base(observer, cancel)
  21. {
  22. this.parent = parent;
  23. }
  24. public IDisposable Run()
  25. {
  26. return parent.source.Subscribe(this);
  27. }
  28. public override void OnNext(T value)
  29. {
  30. observer.OnNext(Notification.CreateOnNext(value));
  31. }
  32. public override void OnError(Exception error)
  33. {
  34. observer.OnNext(Notification.CreateOnError<T>(error));
  35. try { observer.OnCompleted(); } finally { Dispose(); }
  36. }
  37. public override void OnCompleted()
  38. {
  39. observer.OnNext(Notification.CreateOnCompleted<T>());
  40. try { observer.OnCompleted(); } finally { Dispose(); }
  41. }
  42. }
  43. }
  44. }