Switch.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. namespace UniRx.Operators
  6. {
  7. internal class SwitchObservable<T> : OperatorObservableBase<T>
  8. {
  9. readonly IObservable<IObservable<T>> sources;
  10. public SwitchObservable(IObservable<IObservable<T>> sources)
  11. : base(true)
  12. {
  13. this.sources = sources;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  16. {
  17. return new SwitchObserver(this, observer, cancel).Run();
  18. }
  19. class SwitchObserver : OperatorObserverBase<IObservable<T>, T>
  20. {
  21. readonly SwitchObservable<T> parent;
  22. readonly object gate = new object();
  23. readonly SerialDisposable innerSubscription = new SerialDisposable();
  24. bool isStopped = false;
  25. ulong latest = 0UL;
  26. bool hasLatest = false;
  27. public SwitchObserver(SwitchObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  28. {
  29. this.parent = parent;
  30. }
  31. public IDisposable Run()
  32. {
  33. var subscription = parent.sources.Subscribe(this);
  34. return StableCompositeDisposable.Create(subscription, innerSubscription);
  35. }
  36. public override void OnNext(IObservable<T> value)
  37. {
  38. var id = default(ulong);
  39. lock (gate)
  40. {
  41. id = unchecked(++latest);
  42. hasLatest = true;
  43. }
  44. var d = new SingleAssignmentDisposable();
  45. innerSubscription.Disposable = d;
  46. d.Disposable = value.Subscribe(new Switch(this, id));
  47. }
  48. public override void OnError(Exception error)
  49. {
  50. lock (gate)
  51. {
  52. try { observer.OnError(error); }
  53. finally { Dispose(); }
  54. }
  55. }
  56. public override void OnCompleted()
  57. {
  58. lock (gate)
  59. {
  60. isStopped = true;
  61. if (!hasLatest)
  62. {
  63. try { observer.OnCompleted(); }
  64. finally { Dispose(); }
  65. }
  66. }
  67. }
  68. class Switch : IObserver<T>
  69. {
  70. readonly SwitchObserver parent;
  71. readonly ulong id;
  72. public Switch(SwitchObserver observer, ulong id)
  73. {
  74. this.parent = observer;
  75. this.id = id;
  76. }
  77. public void OnNext(T value)
  78. {
  79. lock (parent.gate)
  80. {
  81. if (parent.latest == id)
  82. {
  83. parent.observer.OnNext(value);
  84. }
  85. }
  86. }
  87. public void OnError(Exception error)
  88. {
  89. lock (parent.gate)
  90. {
  91. if (parent.latest == id)
  92. {
  93. parent.observer.OnError(error);
  94. }
  95. }
  96. }
  97. public void OnCompleted()
  98. {
  99. lock (parent.gate)
  100. {
  101. if (parent.latest == id)
  102. {
  103. parent.hasLatest = false;
  104. if (parent.isStopped)
  105. {
  106. parent.observer.OnCompleted();
  107. }
  108. }
  109. }
  110. }
  111. }
  112. }
  113. }
  114. }