using System; using System.Collections.Generic; using UniRx.Operators; namespace UniRx.Operators { internal class BufferObservable : OperatorObservableBase> { readonly IObservable source; readonly int count; readonly int skip; readonly TimeSpan timeSpan; readonly TimeSpan timeShift; readonly IScheduler scheduler; public BufferObservable(IObservable source, int count, int skip) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.count = count; this.skip = skip; } public BufferObservable(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.timeSpan = timeSpan; this.timeShift = timeShift; this.scheduler = scheduler; } public BufferObservable(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.timeSpan = timeSpan; this.count = count; this.scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver> observer, IDisposable cancel) { // count,skip if (scheduler == null) { if (skip == 0) { return new Buffer(this, observer, cancel).Run(); } else { return new Buffer_(this, observer, cancel).Run(); } } else { // time + count if (count > 0) { return new BufferTC(this, observer, cancel).Run(); } else { if (timeSpan == timeShift) { return new BufferT(this, observer, cancel).Run(); } else { return new BufferTS(this, observer, cancel).Run(); } } } } // count only class Buffer : OperatorObserverBase> { readonly BufferObservable parent; List list; public Buffer(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { list = new List(parent.count); return parent.source.Subscribe(this); } public override void OnNext(T value) { list.Add(value); if (list.Count == parent.count) { observer.OnNext(list); list = new List(parent.count); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { if (list.Count > 0) { observer.OnNext(list); } try { observer.OnCompleted(); } finally { Dispose(); } } } // count and skip class Buffer_ : OperatorObserverBase> { readonly BufferObservable parent; Queue> q; int index; public Buffer_(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { q = new Queue>(); index = -1; return parent.source.Subscribe(this); } public override void OnNext(T value) { index++; if (index % parent.skip == 0) { q.Enqueue(new List(parent.count)); } var len = q.Count; for (int i = 0; i < len; i++) { var list = q.Dequeue(); list.Add(value); if (list.Count == parent.count) { observer.OnNext(list); } else { q.Enqueue(list); } } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { foreach (var list in q) { observer.OnNext(list); } try { observer.OnCompleted(); } finally { Dispose(); } } } // timespan = timeshift class BufferT : OperatorObserverBase> { static readonly T[] EmptyArray = new T[0]; readonly BufferObservable parent; readonly object gate = new object(); List list; public BufferT(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { list = new List(); var timerSubscription = Observable.Interval(parent.timeSpan, parent.scheduler) .Subscribe(new Buffer(this)); var sourceSubscription = parent.source.Subscribe(this); return StableCompositeDisposable.Create(timerSubscription, sourceSubscription); } public override void OnNext(T value) { lock (gate) { list.Add(value); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { List currentList; lock (gate) { currentList = list; } observer.OnNext(currentList); try { observer.OnCompleted(); } finally { Dispose(); } } class Buffer : IObserver { BufferT parent; public Buffer(BufferT parent) { this.parent = parent; } public void OnNext(long value) { var isZero = false; List currentList; lock (parent.gate) { currentList = parent.list; if (currentList.Count != 0) { parent.list = new List(); } else { isZero = true; } } parent.observer.OnNext((isZero) ? (IList)EmptyArray : currentList); } public void OnError(Exception error) { } public void OnCompleted() { } } } // timespan + timeshift class BufferTS : OperatorObserverBase> { readonly BufferObservable parent; readonly object gate = new object(); Queue> q; TimeSpan totalTime; TimeSpan nextShift; TimeSpan nextSpan; SerialDisposable timerD; public BufferTS(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { totalTime = TimeSpan.Zero; nextShift = parent.timeShift; nextSpan = parent.timeSpan; q = new Queue>(); timerD = new SerialDisposable(); q.Enqueue(new List()); CreateTimer(); var subscription = parent.source.Subscribe(this); return StableCompositeDisposable.Create(subscription, timerD); } void CreateTimer() { var m = new SingleAssignmentDisposable(); timerD.Disposable = m; var isSpan = false; var isShift = false; if (nextSpan == nextShift) { isSpan = true; isShift = true; } else if (nextSpan < nextShift) isSpan = true; else isShift = true; var newTotalTime = isSpan ? nextSpan : nextShift; var ts = newTotalTime - totalTime; totalTime = newTotalTime; if (isSpan) nextSpan += parent.timeShift; if (isShift) nextShift += parent.timeShift; m.Disposable = parent.scheduler.Schedule(ts, () => { lock (gate) { if (isShift) { var s = new List(); q.Enqueue(s); } if (isSpan) { var s = q.Dequeue(); observer.OnNext(s); } } CreateTimer(); }); } public override void OnNext(T value) { lock (gate) { foreach (var s in q) { s.Add(value); } } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { lock (gate) { foreach (var list in q) { observer.OnNext(list); } try { observer.OnCompleted(); } finally { Dispose(); } } } } // timespan + count class BufferTC : OperatorObserverBase> { static readonly T[] EmptyArray = new T[0]; // cache readonly BufferObservable parent; readonly object gate = new object(); List list; long timerId; SerialDisposable timerD; public BufferTC(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { list = new List(); timerId = 0L; timerD = new SerialDisposable(); CreateTimer(); var subscription = parent.source.Subscribe(this); return StableCompositeDisposable.Create(subscription, timerD); } void CreateTimer() { var currentTimerId = timerId; var timerS = new SingleAssignmentDisposable(); timerD.Disposable = timerS; // restart timer(dispose before) var periodicScheduler = parent.scheduler as ISchedulerPeriodic; if (periodicScheduler != null) { timerS.Disposable = periodicScheduler.SchedulePeriodic(parent.timeSpan, () => OnNextTick(currentTimerId)); } else { timerS.Disposable = parent.scheduler.Schedule(parent.timeSpan, self => OnNextRecursive(currentTimerId, self)); } } void OnNextTick(long currentTimerId) { var isZero = false; List currentList; lock (gate) { if (currentTimerId != timerId) return; currentList = list; if (currentList.Count != 0) { list = new List(); } else { isZero = true; } } observer.OnNext((isZero) ? (IList)EmptyArray : currentList); } void OnNextRecursive(long currentTimerId, Action self) { var isZero = false; List currentList; lock (gate) { if (currentTimerId != timerId) return; currentList = list; if (currentList.Count != 0) { list = new List(); } else { isZero = true; } } observer.OnNext((isZero) ? (IList)EmptyArray : currentList); self(parent.timeSpan); } public override void OnNext(T value) { List currentList = null; lock (gate) { list.Add(value); if (list.Count == parent.count) { currentList = list; list = new List(); timerId++; CreateTimer(); } } if (currentList != null) { observer.OnNext(currentList); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { List currentList; lock (gate) { timerId++; currentList = list; } observer.OnNext(currentList); try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class BufferObservable : OperatorObservableBase> { readonly IObservable source; readonly IObservable windowBoundaries; public BufferObservable(IObservable source, IObservable windowBoundaries) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.windowBoundaries = windowBoundaries; } protected override IDisposable SubscribeCore(IObserver> observer, IDisposable cancel) { return new Buffer(this, observer, cancel).Run(); } class Buffer : OperatorObserverBase> { static readonly TSource[] EmptyArray = new TSource[0]; // cache readonly BufferObservable parent; object gate = new object(); List list; public Buffer(BufferObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { list = new List(); var sourceSubscription = parent.source.Subscribe(this); var windowSubscription = parent.windowBoundaries.Subscribe(new Buffer_(this)); return StableCompositeDisposable.Create(sourceSubscription, windowSubscription); } public override void OnNext(TSource value) { lock (gate) { list.Add(value); } } public override void OnError(Exception error) { lock (gate) { try { observer.OnError(error); } finally { Dispose(); } } } public override void OnCompleted() { lock (gate) { var currentList = list; list = new List(); // safe observer.OnNext(currentList); try { observer.OnCompleted(); } finally { Dispose(); } } } class Buffer_ : IObserver { readonly Buffer parent; public Buffer_(Buffer parent) { this.parent = parent; } public void OnNext(TWindowBoundary value) { var isZero = false; List currentList; lock (parent.gate) { currentList = parent.list; if (currentList.Count != 0) { parent.list = new List(); } else { isZero = true; } } if (isZero) { parent.observer.OnNext(EmptyArray); } else { parent.observer.OnNext(currentList); } } public void OnError(Exception error) { parent.OnError(error); } public void OnCompleted() { parent.OnCompleted(); } } } } }