using System; using System.Collections.Generic; namespace UniRx.Operators { internal class AmbObservable : OperatorObservableBase { readonly IObservable source; readonly IObservable second; public AmbObservable(IObservable source, IObservable second) : base(source.IsRequiredSubscribeOnCurrentThread() || second.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.second = second; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new AmbOuterObserver(this, observer, cancel).Run(); } class AmbOuterObserver : OperatorObserverBase { enum AmbState { Left, Right, Neither } readonly AmbObservable parent; readonly object gate = new object(); SingleAssignmentDisposable leftSubscription; SingleAssignmentDisposable rightSubscription; AmbState choice = AmbState.Neither; public AmbOuterObserver(AmbObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { leftSubscription = new SingleAssignmentDisposable(); rightSubscription = new SingleAssignmentDisposable(); var d = StableCompositeDisposable.Create(leftSubscription, rightSubscription); var left = new Amb(); left.targetDisposable = d; left.targetObserver = new AmbDecisionObserver(this, AmbState.Left, rightSubscription, left); var right = new Amb(); right.targetDisposable = d; right.targetObserver = new AmbDecisionObserver(this, AmbState.Right, leftSubscription, right); leftSubscription.Disposable = parent.source.Subscribe(left); rightSubscription.Disposable = parent.second.Subscribe(right); return d; } public override void OnNext(T value) { // no use } public override void OnError(Exception error) { // no use } public override void OnCompleted() { // no use } class Amb : IObserver { public IObserver targetObserver; public IDisposable targetDisposable; public void OnNext(T value) { targetObserver.OnNext(value); } public void OnError(Exception error) { try { targetObserver.OnError(error); } finally { targetObserver = UniRx.InternalUtil.EmptyObserver.Instance; targetDisposable.Dispose(); } } public void OnCompleted() { try { targetObserver.OnCompleted(); } finally { targetObserver = UniRx.InternalUtil.EmptyObserver.Instance; targetDisposable.Dispose(); } } } class AmbDecisionObserver : IObserver { readonly AmbOuterObserver parent; readonly AmbState me; readonly IDisposable otherSubscription; readonly Amb self; public AmbDecisionObserver(AmbOuterObserver parent, AmbState me, IDisposable otherSubscription, Amb self) { this.parent = parent; this.me = me; this.otherSubscription = otherSubscription; this.self = self; } public void OnNext(T value) { lock (parent.gate) { if (parent.choice == AmbState.Neither) { parent.choice = me; otherSubscription.Dispose(); self.targetObserver = parent.observer; } if (parent.choice == me) self.targetObserver.OnNext(value); } } public void OnError(Exception error) { lock (parent.gate) { if (parent.choice == AmbState.Neither) { parent.choice = me; otherSubscription.Dispose(); self.targetObserver = parent.observer; } if (parent.choice == me) { self.targetObserver.OnError(error); } } } public void OnCompleted() { lock (parent.gate) { if (parent.choice == AmbState.Neither) { parent.choice = me; otherSubscription.Dispose(); self.targetObserver = parent.observer; } if (parent.choice == me) { self.targetObserver.OnCompleted(); } } } } } } }