AsUnitObservable.cs 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class AsUnitObservableObservable<T> : OperatorObservableBase<Unit>
  5. {
  6. readonly IObservable<T> source;
  7. public AsUnitObservableObservable(IObservable<T> source)
  8. : base(source.IsRequiredSubscribeOnCurrentThread())
  9. {
  10. this.source = source;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<Unit> observer, IDisposable cancel)
  13. {
  14. return source.Subscribe(new AsUnitObservable(observer, cancel));
  15. }
  16. class AsUnitObservable : OperatorObserverBase<T, Unit>
  17. {
  18. public AsUnitObservable(IObserver<Unit> observer, IDisposable cancel)
  19. : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(T value)
  23. {
  24. base.observer.OnNext(Unit.Default);
  25. }
  26. public override void OnError(Exception error)
  27. {
  28. try { observer.OnError(error); }
  29. finally { Dispose(); }
  30. }
  31. public override void OnCompleted()
  32. {
  33. try { observer.OnCompleted(); }
  34. finally { Dispose(); }
  35. }
  36. }
  37. }
  38. }