using System; using UniRx.Operators; namespace UniRx.Operators { internal class SkipObservable : OperatorObservableBase { readonly IObservable source; readonly int count; readonly TimeSpan duration; internal readonly IScheduler scheduler; // public for optimization check public SkipObservable(IObservable source, int count) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.count = count; } public SkipObservable(IObservable source, TimeSpan duration, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.duration = duration; this.scheduler = scheduler; } // optimize combiner public IObservable Combine(int count) { // use sum // xs = 6 // xs.Skip(2) = 4 // xs.Skip(2).Skip(3) = 1 return new SkipObservable(source, this.count + count); } public IObservable Combine(TimeSpan duration) { // use max // xs = 6s // xs.Skip(2s) = 2s // xs.Skip(2s).Skip(3s) = 3s return (duration <= this.duration) ? this : new SkipObservable(source, duration, scheduler); } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { if (scheduler == null) { return source.Subscribe(new Skip(this, observer, cancel)); } else { return new Skip_(this, observer, cancel).Run(); } } class Skip : OperatorObserverBase { int remaining; public Skip(SkipObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.remaining = parent.count; } public override void OnNext(T value) { if (remaining <= 0) { base.observer.OnNext(value); } else { remaining--; } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } class Skip_ : OperatorObserverBase { readonly SkipObservable parent; volatile bool open; public Skip_(SkipObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { var d1 = parent.scheduler.Schedule(parent.duration, Tick); var d2 = parent.source.Subscribe(this); return StableCompositeDisposable.Create(d1, d2); } void Tick() { open = true; } public override void OnNext(T value) { if (open) { base.observer.OnNext(value); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); }; } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); }; } } } }