using System; using System.Collections.Generic; using System.Text; using UniRx.InternalUtil; using UniRx.Operators; namespace UniRx { // Take, Skip, etc.. public static partial class Observable { public static IObservable Take(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); if (count == 0) return Empty(); // optimize .Take(count).Take(count) var take = source as TakeObservable; if (take != null && take.scheduler == null) { return take.Combine(count); } return new TakeObservable(source, count); } public static IObservable Take(this IObservable source, TimeSpan duration) { return Take(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable Take(this IObservable source, TimeSpan duration, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); // optimize .Take(duration).Take(duration) var take = source as TakeObservable; if (take != null && take.scheduler == scheduler) { return take.Combine(duration); } return new TakeObservable(source, duration, scheduler); } public static IObservable TakeWhile(this IObservable source, Func predicate) { return new TakeWhileObservable(source, predicate); } public static IObservable TakeWhile(this IObservable source, Func predicate) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return new TakeWhileObservable(source, predicate); } public static IObservable TakeUntil(this IObservable source, IObservable other) { if (source == null) throw new ArgumentNullException("source"); if (other == null) throw new ArgumentNullException("other"); return new TakeUntilObservable(source, other); } public static IObservable TakeLast(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); return new TakeLastObservable(source, count); } public static IObservable TakeLast(this IObservable source, TimeSpan duration) { return TakeLast(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable TakeLast(this IObservable source, TimeSpan duration, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); return new TakeLastObservable(source, duration, scheduler); } public static IObservable Skip(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); // optimize .Skip(count).Skip(count) var skip = source as SkipObservable; if (skip != null && skip.scheduler == null) { return skip.Combine(count); } return new SkipObservable(source, count); } public static IObservable Skip(this IObservable source, TimeSpan duration) { return Skip(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable Skip(this IObservable source, TimeSpan duration, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); // optimize .Skip(duration).Skip(duration) var skip = source as SkipObservable; if (skip != null && skip.scheduler == scheduler) { return skip.Combine(duration); } return new SkipObservable(source, duration, scheduler); } public static IObservable SkipWhile(this IObservable source, Func predicate) { return new SkipWhileObservable(source, predicate); } public static IObservable SkipWhile(this IObservable source, Func predicate) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return new SkipWhileObservable(source, predicate); } public static IObservable SkipUntil(this IObservable source, IObservable other) { return new SkipUntilObservable(source, other); } public static IObservable> Buffer(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0"); return new BufferObservable(source, count, 0); } public static IObservable> Buffer(this IObservable source, int count, int skip) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0"); if (skip <= 0) throw new ArgumentOutOfRangeException("skip <= 0"); return new BufferObservable(source, count, skip); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan) { return Buffer(source, timeSpan, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); return new BufferObservable(source, timeSpan, timeSpan, scheduler); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan, int count) { return Buffer(source, timeSpan, count, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0"); return new BufferObservable(source, timeSpan, count, scheduler); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan, TimeSpan timeShift) { return new BufferObservable(source, timeSpan, timeShift, Scheduler.DefaultSchedulers.TimeBasedOperations); } public static IObservable> Buffer(this IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); return new BufferObservable(source, timeSpan, timeShift, scheduler); } public static IObservable> Buffer(this IObservable source, IObservable windowBoundaries) { return new BufferObservable(source, windowBoundaries); } /// Projects old and new element of a sequence into a new form. public static IObservable> Pairwise(this IObservable source) { return new PairwiseObservable(source); } /// Projects old and new element of a sequence into a new form. public static IObservable Pairwise(this IObservable source, Func selector) { return new PairwiseObservable(source, selector); } // first, last, single public static IObservable Last(this IObservable source) { return new LastObservable(source, false); } public static IObservable Last(this IObservable source, Func predicate) { return new LastObservable(source, predicate, false); } public static IObservable LastOrDefault(this IObservable source) { return new LastObservable(source, true); } public static IObservable LastOrDefault(this IObservable source, Func predicate) { return new LastObservable(source, predicate, true); } public static IObservable First(this IObservable source) { return new FirstObservable(source, false); } public static IObservable First(this IObservable source, Func predicate) { return new FirstObservable(source, predicate, false); } public static IObservable FirstOrDefault(this IObservable source) { return new FirstObservable(source, true); } public static IObservable FirstOrDefault(this IObservable source, Func predicate) { return new FirstObservable(source, predicate, true); } public static IObservable Single(this IObservable source) { return new SingleObservable(source, false); } public static IObservable Single(this IObservable source, Func predicate) { return new SingleObservable(source, predicate, false); } public static IObservable SingleOrDefault(this IObservable source) { return new SingleObservable(source, true); } public static IObservable SingleOrDefault(this IObservable source, Func predicate) { return new SingleObservable(source, predicate, true); } // Grouping public static IObservable> GroupBy(this IObservable source, Func keySelector) { return GroupBy(source, keySelector, Stubs.Identity); } public static IObservable> GroupBy(this IObservable source, Func keySelector, IEqualityComparer comparer) { return GroupBy(source, keySelector, Stubs.Identity, comparer); } public static IObservable> GroupBy(this IObservable source, Func keySelector, Func elementSelector) { #if !UniRxLibrary var comparer = UnityEqualityComparer.GetDefault(); #else var comparer = EqualityComparer.Default; #endif return GroupBy(source, keySelector, elementSelector, comparer); } public static IObservable> GroupBy(this IObservable source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { return new GroupByObservable(source, keySelector, elementSelector, null, comparer); } public static IObservable> GroupBy(this IObservable source, Func keySelector, int capacity) { return GroupBy(source, keySelector, Stubs.Identity, capacity); } public static IObservable> GroupBy(this IObservable source, Func keySelector, int capacity, IEqualityComparer comparer) { return GroupBy(source, keySelector, Stubs.Identity, capacity, comparer); } public static IObservable> GroupBy(this IObservable source, Func keySelector, Func elementSelector, int capacity) { #if !UniRxLibrary var comparer = UnityEqualityComparer.GetDefault(); #else var comparer = EqualityComparer.Default; #endif return GroupBy(source, keySelector, elementSelector, capacity, comparer); } public static IObservable> GroupBy(this IObservable source, Func keySelector, Func elementSelector, int capacity, IEqualityComparer comparer) { return new GroupByObservable(source, keySelector, elementSelector, capacity, comparer); } } }