Amb.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class AmbObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. readonly IObservable<T> second;
  9. public AmbObservable(IObservable<T> source, IObservable<T> second)
  10. : base(source.IsRequiredSubscribeOnCurrentThread() || second.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.second = second;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  16. {
  17. return new AmbOuterObserver(this, observer, cancel).Run();
  18. }
  19. class AmbOuterObserver : OperatorObserverBase<T, T>
  20. {
  21. enum AmbState
  22. {
  23. Left, Right, Neither
  24. }
  25. readonly AmbObservable<T> parent;
  26. readonly object gate = new object();
  27. SingleAssignmentDisposable leftSubscription;
  28. SingleAssignmentDisposable rightSubscription;
  29. AmbState choice = AmbState.Neither;
  30. public AmbOuterObserver(AmbObservable<T> parent, IObserver<T> observer, IDisposable cancel)
  31. : base(observer, cancel)
  32. {
  33. this.parent = parent;
  34. }
  35. public IDisposable Run()
  36. {
  37. leftSubscription = new SingleAssignmentDisposable();
  38. rightSubscription = new SingleAssignmentDisposable();
  39. var d = StableCompositeDisposable.Create(leftSubscription, rightSubscription);
  40. var left = new Amb();
  41. left.targetDisposable = d;
  42. left.targetObserver = new AmbDecisionObserver(this, AmbState.Left, rightSubscription, left);
  43. var right = new Amb();
  44. right.targetDisposable = d;
  45. right.targetObserver = new AmbDecisionObserver(this, AmbState.Right, leftSubscription, right);
  46. leftSubscription.Disposable = parent.source.Subscribe(left);
  47. rightSubscription.Disposable = parent.second.Subscribe(right);
  48. return d;
  49. }
  50. public override void OnNext(T value)
  51. {
  52. // no use
  53. }
  54. public override void OnError(Exception error)
  55. {
  56. // no use
  57. }
  58. public override void OnCompleted()
  59. {
  60. // no use
  61. }
  62. class Amb : IObserver<T>
  63. {
  64. public IObserver<T> targetObserver;
  65. public IDisposable targetDisposable;
  66. public void OnNext(T value)
  67. {
  68. targetObserver.OnNext(value);
  69. }
  70. public void OnError(Exception error)
  71. {
  72. try
  73. {
  74. targetObserver.OnError(error);
  75. }
  76. finally
  77. {
  78. targetObserver = UniRx.InternalUtil.EmptyObserver<T>.Instance;
  79. targetDisposable.Dispose();
  80. }
  81. }
  82. public void OnCompleted()
  83. {
  84. try
  85. {
  86. targetObserver.OnCompleted();
  87. }
  88. finally
  89. {
  90. targetObserver = UniRx.InternalUtil.EmptyObserver<T>.Instance;
  91. targetDisposable.Dispose();
  92. }
  93. }
  94. }
  95. class AmbDecisionObserver : IObserver<T>
  96. {
  97. readonly AmbOuterObserver parent;
  98. readonly AmbState me;
  99. readonly IDisposable otherSubscription;
  100. readonly Amb self;
  101. public AmbDecisionObserver(AmbOuterObserver parent, AmbState me, IDisposable otherSubscription, Amb self)
  102. {
  103. this.parent = parent;
  104. this.me = me;
  105. this.otherSubscription = otherSubscription;
  106. this.self = self;
  107. }
  108. public void OnNext(T value)
  109. {
  110. lock (parent.gate)
  111. {
  112. if (parent.choice == AmbState.Neither)
  113. {
  114. parent.choice = me;
  115. otherSubscription.Dispose();
  116. self.targetObserver = parent.observer;
  117. }
  118. if (parent.choice == me) self.targetObserver.OnNext(value);
  119. }
  120. }
  121. public void OnError(Exception error)
  122. {
  123. lock (parent.gate)
  124. {
  125. if (parent.choice == AmbState.Neither)
  126. {
  127. parent.choice = me;
  128. otherSubscription.Dispose();
  129. self.targetObserver = parent.observer;
  130. }
  131. if (parent.choice == me)
  132. {
  133. self.targetObserver.OnError(error);
  134. }
  135. }
  136. }
  137. public void OnCompleted()
  138. {
  139. lock (parent.gate)
  140. {
  141. if (parent.choice == AmbState.Neither)
  142. {
  143. parent.choice = me;
  144. otherSubscription.Dispose();
  145. self.targetObserver = parent.observer;
  146. }
  147. if (parent.choice == me)
  148. {
  149. self.targetObserver.OnCompleted();
  150. }
  151. }
  152. }
  153. }
  154. }
  155. }
  156. }