using System; using System.Collections.Generic; namespace UniRx.Operators { // needs to more improvement internal class ConcatObservable : OperatorObservableBase { readonly IEnumerable> sources; public ConcatObservable(IEnumerable> sources) : base(true) { this.sources = sources; } public IObservable Combine(IEnumerable> combineSources) { return new ConcatObservable(CombineSources(this.sources, combineSources)); } static IEnumerable> CombineSources(IEnumerable> first, IEnumerable> second) { foreach (var item in first) { yield return item; } foreach (var item in second) { yield return item; } } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new Concat(this, observer, cancel).Run(); } class Concat : OperatorObserverBase { readonly ConcatObservable parent; readonly object gate = new object(); bool isDisposed; IEnumerator> e; SerialDisposable subscription; Action nextSelf; public Concat(ConcatObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { isDisposed = false; e = parent.sources.GetEnumerator(); subscription = new SerialDisposable(); var schedule = Scheduler.DefaultSchedulers.TailRecursion.Schedule(RecursiveRun); return StableCompositeDisposable.Create(schedule, subscription, Disposable.Create(() => { lock (gate) { this.isDisposed = true; this.e.Dispose(); } })); } void RecursiveRun(Action self) { lock (gate) { this.nextSelf = self; if (isDisposed) return; var current = default(IObservable); var hasNext = false; var ex = default(Exception); try { hasNext = e.MoveNext(); if (hasNext) { current = e.Current; if (current == null) throw new InvalidOperationException("sequence is null."); } else { e.Dispose(); } } catch (Exception exception) { ex = exception; e.Dispose(); } if (ex != null) { try { observer.OnError(ex); } finally { Dispose(); } return; } if (!hasNext) { try { observer.OnCompleted(); } finally { Dispose(); } return; } var source = current; var d = new SingleAssignmentDisposable(); subscription.Disposable = d; d.Disposable = source.Subscribe(this); } } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { this.nextSelf(); } } } }