using System; using System.Collections.Generic; namespace UniRx.Operators { internal class MergeObservable : OperatorObservableBase { private readonly IObservable> sources; private readonly int maxConcurrent; public MergeObservable(IObservable> sources, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.sources = sources; } public MergeObservable(IObservable> sources, int maxConcurrent, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.sources = sources; this.maxConcurrent = maxConcurrent; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { if (maxConcurrent > 0) { return new MergeConcurrentObserver(this, observer, cancel).Run(); } else { return new MergeOuterObserver(this, observer, cancel).Run(); } } class MergeOuterObserver : OperatorObserverBase, T> { readonly MergeObservable parent; CompositeDisposable collectionDisposable; SingleAssignmentDisposable sourceDisposable; object gate = new object(); bool isStopped = false; public MergeOuterObserver(MergeObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { collectionDisposable = new CompositeDisposable(); sourceDisposable = new SingleAssignmentDisposable(); collectionDisposable.Add(sourceDisposable); sourceDisposable.Disposable = parent.sources.Subscribe(this); return collectionDisposable; } public override void OnNext(IObservable value) { var disposable = new SingleAssignmentDisposable(); collectionDisposable.Add(disposable); var collectionObserver = new Merge(this, disposable); disposable.Disposable = value.Subscribe(collectionObserver); } public override void OnError(Exception error) { lock (gate) { try { observer.OnError(error); } finally { Dispose(); }; } } public override void OnCompleted() { isStopped = true; if (collectionDisposable.Count == 1) { lock (gate) { try { observer.OnCompleted(); } finally { Dispose(); }; } } else { sourceDisposable.Dispose(); } } class Merge : OperatorObserverBase { readonly MergeOuterObserver parent; readonly IDisposable cancel; public Merge(MergeOuterObserver parent, IDisposable cancel) : base(parent.observer, cancel) { this.parent = parent; this.cancel = cancel; } public override void OnNext(T value) { lock (parent.gate) { base.observer.OnNext(value); } } public override void OnError(Exception error) { lock (parent.gate) { try { observer.OnError(error); } finally { Dispose(); }; } } public override void OnCompleted() { parent.collectionDisposable.Remove(cancel); if (parent.isStopped && parent.collectionDisposable.Count == 1) { lock (parent.gate) { try { observer.OnCompleted(); } finally { Dispose(); }; } } } } } class MergeConcurrentObserver : OperatorObserverBase, T> { readonly MergeObservable parent; CompositeDisposable collectionDisposable; SingleAssignmentDisposable sourceDisposable; object gate = new object(); bool isStopped = false; // concurrency Queue> q; int activeCount; public MergeConcurrentObserver(MergeObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { q = new Queue>(); activeCount = 0; collectionDisposable = new CompositeDisposable(); sourceDisposable = new SingleAssignmentDisposable(); collectionDisposable.Add(sourceDisposable); sourceDisposable.Disposable = parent.sources.Subscribe(this); return collectionDisposable; } public override void OnNext(IObservable value) { lock (gate) { if (activeCount < parent.maxConcurrent) { activeCount++; Subscribe(value); } else { q.Enqueue(value); } } } public override void OnError(Exception error) { lock (gate) { try { observer.OnError(error); } finally { Dispose(); }; } } public override void OnCompleted() { lock (gate) { isStopped = true; if (activeCount == 0) { try { observer.OnCompleted(); } finally { Dispose(); }; } else { sourceDisposable.Dispose(); } } } void Subscribe(IObservable innerSource) { var disposable = new SingleAssignmentDisposable(); collectionDisposable.Add(disposable); var collectionObserver = new Merge(this, disposable); disposable.Disposable = innerSource.Subscribe(collectionObserver); } class Merge : OperatorObserverBase { readonly MergeConcurrentObserver parent; readonly IDisposable cancel; public Merge(MergeConcurrentObserver parent, IDisposable cancel) : base(parent.observer, cancel) { this.parent = parent; this.cancel = cancel; } public override void OnNext(T value) { lock (parent.gate) { base.observer.OnNext(value); } } public override void OnError(Exception error) { lock (parent.gate) { try { observer.OnError(error); } finally { Dispose(); }; } } public override void OnCompleted() { parent.collectionDisposable.Remove(cancel); lock (parent.gate) { if (parent.q.Count > 0) { var source = parent.q.Dequeue(); parent.Subscribe(source); } else { parent.activeCount--; if (parent.isStopped && parent.activeCount == 0) { try { observer.OnCompleted(); } finally { Dispose(); }; } } } } } } } }