123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- using System;
- using System.Collections.Generic;
- namespace UniRx.Operators
- {
- internal class MergeObservable<T> : OperatorObservableBase<T>
- {
- private readonly IObservable<IObservable<T>> sources;
- private readonly int maxConcurrent;
- public MergeObservable(IObservable<IObservable<T>> sources, bool isRequiredSubscribeOnCurrentThread)
- : base(isRequiredSubscribeOnCurrentThread)
- {
- this.sources = sources;
- }
- public MergeObservable(IObservable<IObservable<T>> sources, int maxConcurrent, bool isRequiredSubscribeOnCurrentThread)
- : base(isRequiredSubscribeOnCurrentThread)
- {
- this.sources = sources;
- this.maxConcurrent = maxConcurrent;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- if (maxConcurrent > 0)
- {
- return new MergeConcurrentObserver(this, observer, cancel).Run();
- }
- else
- {
- return new MergeOuterObserver(this, observer, cancel).Run();
- }
- }
- class MergeOuterObserver : OperatorObserverBase<IObservable<T>, T>
- {
- readonly MergeObservable<T> parent;
- CompositeDisposable collectionDisposable;
- SingleAssignmentDisposable sourceDisposable;
- object gate = new object();
- bool isStopped = false;
- public MergeOuterObserver(MergeObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- collectionDisposable = new CompositeDisposable();
- sourceDisposable = new SingleAssignmentDisposable();
- collectionDisposable.Add(sourceDisposable);
- sourceDisposable.Disposable = parent.sources.Subscribe(this);
- return collectionDisposable;
- }
- public override void OnNext(IObservable<T> value)
- {
- var disposable = new SingleAssignmentDisposable();
- collectionDisposable.Add(disposable);
- var collectionObserver = new Merge(this, disposable);
- disposable.Disposable = value.Subscribe(collectionObserver);
- }
- public override void OnError(Exception error)
- {
- lock (gate)
- {
- try { observer.OnError(error); } finally { Dispose(); };
- }
- }
- public override void OnCompleted()
- {
- isStopped = true;
- if (collectionDisposable.Count == 1)
- {
- lock (gate)
- {
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- else
- {
- sourceDisposable.Dispose();
- }
- }
- class Merge : OperatorObserverBase<T, T>
- {
- readonly MergeOuterObserver parent;
- readonly IDisposable cancel;
- public Merge(MergeOuterObserver parent, IDisposable cancel)
- : base(parent.observer, cancel)
- {
- this.parent = parent;
- this.cancel = cancel;
- }
- public override void OnNext(T value)
- {
- lock (parent.gate)
- {
- base.observer.OnNext(value);
- }
- }
- public override void OnError(Exception error)
- {
- lock (parent.gate)
- {
- try { observer.OnError(error); } finally { Dispose(); };
- }
- }
- public override void OnCompleted()
- {
- parent.collectionDisposable.Remove(cancel);
- if (parent.isStopped && parent.collectionDisposable.Count == 1)
- {
- lock (parent.gate)
- {
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- }
- }
- class MergeConcurrentObserver : OperatorObserverBase<IObservable<T>, T>
- {
- readonly MergeObservable<T> parent;
- CompositeDisposable collectionDisposable;
- SingleAssignmentDisposable sourceDisposable;
- object gate = new object();
- bool isStopped = false;
- // concurrency
- Queue<IObservable<T>> q;
- int activeCount;
- public MergeConcurrentObserver(MergeObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- q = new Queue<IObservable<T>>();
- activeCount = 0;
- collectionDisposable = new CompositeDisposable();
- sourceDisposable = new SingleAssignmentDisposable();
- collectionDisposable.Add(sourceDisposable);
- sourceDisposable.Disposable = parent.sources.Subscribe(this);
- return collectionDisposable;
- }
- public override void OnNext(IObservable<T> value)
- {
- lock (gate)
- {
- if (activeCount < parent.maxConcurrent)
- {
- activeCount++;
- Subscribe(value);
- }
- else
- {
- q.Enqueue(value);
- }
- }
- }
- public override void OnError(Exception error)
- {
- lock (gate)
- {
- try { observer.OnError(error); } finally { Dispose(); };
- }
- }
- public override void OnCompleted()
- {
- lock (gate)
- {
- isStopped = true;
- if (activeCount == 0)
- {
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- else
- {
- sourceDisposable.Dispose();
- }
- }
- }
- void Subscribe(IObservable<T> innerSource)
- {
- var disposable = new SingleAssignmentDisposable();
- collectionDisposable.Add(disposable);
- var collectionObserver = new Merge(this, disposable);
- disposable.Disposable = innerSource.Subscribe(collectionObserver);
- }
- class Merge : OperatorObserverBase<T, T>
- {
- readonly MergeConcurrentObserver parent;
- readonly IDisposable cancel;
- public Merge(MergeConcurrentObserver parent, IDisposable cancel)
- : base(parent.observer, cancel)
- {
- this.parent = parent;
- this.cancel = cancel;
- }
- public override void OnNext(T value)
- {
- lock (parent.gate)
- {
- base.observer.OnNext(value);
- }
- }
- public override void OnError(Exception error)
- {
- lock (parent.gate)
- {
- try { observer.OnError(error); } finally { Dispose(); };
- }
- }
- public override void OnCompleted()
- {
- parent.collectionDisposable.Remove(cancel);
- lock (parent.gate)
- {
- if (parent.q.Count > 0)
- {
- var source = parent.q.Dequeue();
- parent.Subscribe(source);
- }
- else
- {
- parent.activeCount--;
- if (parent.isStopped && parent.activeCount == 0)
- {
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- }
- }
- }
- }
- }
|