AsObservable.cs 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class AsObservableObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. public AsObservableObservable(IObservable<T> source)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. }
  13. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  14. {
  15. return source.Subscribe(new AsObservable(observer, cancel));
  16. }
  17. class AsObservable : OperatorObserverBase<T, T>
  18. {
  19. public AsObservable(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(T value)
  23. {
  24. base.observer.OnNext(value);
  25. }
  26. public override void OnError(Exception error)
  27. {
  28. try { observer.OnError(error); }
  29. finally { Dispose(); }
  30. }
  31. public override void OnCompleted()
  32. {
  33. try { observer.OnCompleted(); }
  34. finally { Dispose(); }
  35. }
  36. }
  37. }
  38. }