123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- using System;
- using System.Collections.Generic;
- using System.Text;
- using UniRx.InternalUtil;
- using UniRx.Operators;
- namespace UniRx
- {
- // Take, Skip, etc..
- public static partial class Observable
- {
- public static IObservable<T> Take<T>(this IObservable<T> source, int count)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count < 0) throw new ArgumentOutOfRangeException("count");
- if (count == 0) return Empty<T>();
- // optimize .Take(count).Take(count)
- var take = source as TakeObservable<T>;
- if (take != null && take.scheduler == null)
- {
- return take.Combine(count);
- }
- return new TakeObservable<T>(source, count);
- }
- public static IObservable<T> Take<T>(this IObservable<T> source, TimeSpan duration)
- {
- return Take(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<T> Take<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (scheduler == null) throw new ArgumentNullException("scheduler");
- // optimize .Take(duration).Take(duration)
- var take = source as TakeObservable<T>;
- if (take != null && take.scheduler == scheduler)
- {
- return take.Combine(duration);
- }
- return new TakeObservable<T>(source, duration, scheduler);
- }
- public static IObservable<T> TakeWhile<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new TakeWhileObservable<T>(source, predicate);
- }
- public static IObservable<T> TakeWhile<T>(this IObservable<T> source, Func<T, int, bool> predicate)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (predicate == null) throw new ArgumentNullException("predicate");
- return new TakeWhileObservable<T>(source, predicate);
- }
- public static IObservable<T> TakeUntil<T, TOther>(this IObservable<T> source, IObservable<TOther> other)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (other == null) throw new ArgumentNullException("other");
- return new TakeUntilObservable<T, TOther>(source, other);
- }
- public static IObservable<T> TakeLast<T>(this IObservable<T> source, int count)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count < 0) throw new ArgumentOutOfRangeException("count");
- return new TakeLastObservable<T>(source, count);
- }
- public static IObservable<T> TakeLast<T>(this IObservable<T> source, TimeSpan duration)
- {
- return TakeLast<T>(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<T> TakeLast<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- return new TakeLastObservable<T>(source, duration, scheduler);
- }
- public static IObservable<T> Skip<T>(this IObservable<T> source, int count)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count < 0) throw new ArgumentOutOfRangeException("count");
- // optimize .Skip(count).Skip(count)
- var skip = source as SkipObservable<T>;
- if (skip != null && skip.scheduler == null)
- {
- return skip.Combine(count);
- }
- return new SkipObservable<T>(source, count);
- }
- public static IObservable<T> Skip<T>(this IObservable<T> source, TimeSpan duration)
- {
- return Skip(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<T> Skip<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (scheduler == null) throw new ArgumentNullException("scheduler");
- // optimize .Skip(duration).Skip(duration)
- var skip = source as SkipObservable<T>;
- if (skip != null && skip.scheduler == scheduler)
- {
- return skip.Combine(duration);
- }
- return new SkipObservable<T>(source, duration, scheduler);
- }
- public static IObservable<T> SkipWhile<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new SkipWhileObservable<T>(source, predicate);
- }
- public static IObservable<T> SkipWhile<T>(this IObservable<T> source, Func<T, int, bool> predicate)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (predicate == null) throw new ArgumentNullException("predicate");
- return new SkipWhileObservable<T>(source, predicate);
- }
- public static IObservable<T> SkipUntil<T, TOther>(this IObservable<T> source, IObservable<TOther> other)
- {
- return new SkipUntilObservable<T, TOther>(source, other);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
- return new BufferObservable<T>(source, count, 0);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count, int skip)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
- if (skip <= 0) throw new ArgumentOutOfRangeException("skip <= 0");
- return new BufferObservable<T>(source, count, skip);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
- {
- return Buffer(source, timeSpan, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- return new BufferObservable<T>(source, timeSpan, timeSpan, scheduler);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, int count)
- {
- return Buffer(source, timeSpan, count, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, int count, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
- return new BufferObservable<T>(source, timeSpan, count, scheduler);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift)
- {
- return new BufferObservable<T>(source, timeSpan, timeShift, Scheduler.DefaultSchedulers.TimeBasedOperations);
- }
- public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
- {
- if (source == null) throw new ArgumentNullException("source");
- return new BufferObservable<T>(source, timeSpan, timeShift, scheduler);
- }
- public static IObservable<IList<TSource>> Buffer<TSource, TWindowBoundary>(this IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
- {
- return new BufferObservable<TSource, TWindowBoundary>(source, windowBoundaries);
- }
- /// <summary>Projects old and new element of a sequence into a new form.</summary>
- public static IObservable<Pair<T>> Pairwise<T>(this IObservable<T> source)
- {
- return new PairwiseObservable<T>(source);
- }
- /// <summary>Projects old and new element of a sequence into a new form.</summary>
- public static IObservable<TR> Pairwise<T, TR>(this IObservable<T> source, Func<T, T, TR> selector)
- {
- return new PairwiseObservable<T, TR>(source, selector);
- }
- // first, last, single
- public static IObservable<T> Last<T>(this IObservable<T> source)
- {
- return new LastObservable<T>(source, false);
- }
- public static IObservable<T> Last<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new LastObservable<T>(source, predicate, false);
- }
- public static IObservable<T> LastOrDefault<T>(this IObservable<T> source)
- {
- return new LastObservable<T>(source, true);
- }
- public static IObservable<T> LastOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new LastObservable<T>(source, predicate, true);
- }
- public static IObservable<T> First<T>(this IObservable<T> source)
- {
- return new FirstObservable<T>(source, false);
- }
- public static IObservable<T> First<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new FirstObservable<T>(source, predicate, false);
- }
- public static IObservable<T> FirstOrDefault<T>(this IObservable<T> source)
- {
- return new FirstObservable<T>(source, true);
- }
- public static IObservable<T> FirstOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new FirstObservable<T>(source, predicate, true);
- }
- public static IObservable<T> Single<T>(this IObservable<T> source)
- {
- return new SingleObservable<T>(source, false);
- }
- public static IObservable<T> Single<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new SingleObservable<T>(source, predicate, false);
- }
- public static IObservable<T> SingleOrDefault<T>(this IObservable<T> source)
- {
- return new SingleObservable<T>(source, true);
- }
- public static IObservable<T> SingleOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
- {
- return new SingleObservable<T>(source, predicate, true);
- }
- // Grouping
- public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
- {
- return GroupBy(source, keySelector, Stubs<TSource>.Identity);
- }
- public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
- {
- return GroupBy(source, keySelector, Stubs<TSource>.Identity, comparer);
- }
- public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
- {
- #if !UniRxLibrary
- var comparer = UnityEqualityComparer.GetDefault<TKey>();
- #else
- var comparer = EqualityComparer<TKey>.Default;
- #endif
- return GroupBy(source, keySelector, elementSelector, comparer);
- }
- public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
- {
- return new GroupByObservable<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
- }
- public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
- {
- return GroupBy(source, keySelector, Stubs<TSource>.Identity, capacity);
- }
- public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
- {
- return GroupBy(source, keySelector, Stubs<TSource>.Identity, capacity, comparer);
- }
- public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
- {
- #if !UniRxLibrary
- var comparer = UnityEqualityComparer.GetDefault<TKey>();
- #else
- var comparer = EqualityComparer<TKey>.Default;
- #endif
- return GroupBy(source, keySelector, elementSelector, capacity, comparer);
- }
- public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
- {
- return new GroupByObservable<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
- }
- }
- }
|