Empty.cs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class EmptyObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IScheduler scheduler;
  7. public EmptyObservable(IScheduler scheduler)
  8. : base(false)
  9. {
  10. this.scheduler = scheduler;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  13. {
  14. observer = new Empty(observer, cancel);
  15. if (scheduler == Scheduler.Immediate)
  16. {
  17. observer.OnCompleted();
  18. return Disposable.Empty;
  19. }
  20. else
  21. {
  22. return scheduler.Schedule(observer.OnCompleted);
  23. }
  24. }
  25. class Empty : OperatorObserverBase<T, T>
  26. {
  27. public Empty(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  28. {
  29. }
  30. public override void OnNext(T value)
  31. {
  32. try
  33. {
  34. base.observer.OnNext(value);
  35. }
  36. catch
  37. {
  38. Dispose();
  39. throw;
  40. }
  41. }
  42. public override void OnError(Exception error)
  43. {
  44. try { observer.OnError(error); }
  45. finally { Dispose(); }
  46. }
  47. public override void OnCompleted()
  48. {
  49. try { observer.OnCompleted(); }
  50. finally { Dispose(); }
  51. }
  52. }
  53. }
  54. internal class ImmutableEmptyObservable<T> : IObservable<T>, IOptimizedObservable<T>
  55. {
  56. internal static ImmutableEmptyObservable<T> Instance = new ImmutableEmptyObservable<T>();
  57. ImmutableEmptyObservable()
  58. {
  59. }
  60. public bool IsRequiredSubscribeOnCurrentThread()
  61. {
  62. return false;
  63. }
  64. public IDisposable Subscribe(IObserver<T> observer)
  65. {
  66. observer.OnCompleted();
  67. return Disposable.Empty;
  68. }
  69. }
  70. }