AsSingleUnitObservable.cs 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class AsSingleUnitObservableObservable<T> : OperatorObservableBase<Unit>
  6. {
  7. readonly IObservable<T> source;
  8. public AsSingleUnitObservableObservable(IObservable<T> source)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. }
  13. protected override IDisposable SubscribeCore(IObserver<Unit> observer, IDisposable cancel)
  14. {
  15. return source.Subscribe(new AsSingleUnitObservable(observer, cancel));
  16. }
  17. class AsSingleUnitObservable : OperatorObserverBase<T, Unit>
  18. {
  19. public AsSingleUnitObservable(IObserver<Unit> observer, IDisposable cancel) : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(T value)
  23. {
  24. }
  25. public override void OnError(Exception error)
  26. {
  27. try { observer.OnError(error); }
  28. finally { Dispose(); }
  29. }
  30. public override void OnCompleted()
  31. {
  32. observer.OnNext(Unit.Default);
  33. try { observer.OnCompleted(); }
  34. finally { Dispose(); }
  35. }
  36. }
  37. }
  38. }