2
0

Observable.Paging.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using UniRx.InternalUtil;
  5. using UniRx.Operators;
  6. namespace UniRx
  7. {
  8. // Take, Skip, etc..
  9. public static partial class Observable
  10. {
  11. public static IObservable<T> Take<T>(this IObservable<T> source, int count)
  12. {
  13. if (source == null) throw new ArgumentNullException("source");
  14. if (count < 0) throw new ArgumentOutOfRangeException("count");
  15. if (count == 0) return Empty<T>();
  16. // optimize .Take(count).Take(count)
  17. var take = source as TakeObservable<T>;
  18. if (take != null && take.scheduler == null)
  19. {
  20. return take.Combine(count);
  21. }
  22. return new TakeObservable<T>(source, count);
  23. }
  24. public static IObservable<T> Take<T>(this IObservable<T> source, TimeSpan duration)
  25. {
  26. return Take(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
  27. }
  28. public static IObservable<T> Take<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
  29. {
  30. if (source == null) throw new ArgumentNullException("source");
  31. if (scheduler == null) throw new ArgumentNullException("scheduler");
  32. // optimize .Take(duration).Take(duration)
  33. var take = source as TakeObservable<T>;
  34. if (take != null && take.scheduler == scheduler)
  35. {
  36. return take.Combine(duration);
  37. }
  38. return new TakeObservable<T>(source, duration, scheduler);
  39. }
  40. public static IObservable<T> TakeWhile<T>(this IObservable<T> source, Func<T, bool> predicate)
  41. {
  42. return new TakeWhileObservable<T>(source, predicate);
  43. }
  44. public static IObservable<T> TakeWhile<T>(this IObservable<T> source, Func<T, int, bool> predicate)
  45. {
  46. if (source == null) throw new ArgumentNullException("source");
  47. if (predicate == null) throw new ArgumentNullException("predicate");
  48. return new TakeWhileObservable<T>(source, predicate);
  49. }
  50. public static IObservable<T> TakeUntil<T, TOther>(this IObservable<T> source, IObservable<TOther> other)
  51. {
  52. if (source == null) throw new ArgumentNullException("source");
  53. if (other == null) throw new ArgumentNullException("other");
  54. return new TakeUntilObservable<T, TOther>(source, other);
  55. }
  56. public static IObservable<T> TakeLast<T>(this IObservable<T> source, int count)
  57. {
  58. if (source == null) throw new ArgumentNullException("source");
  59. if (count < 0) throw new ArgumentOutOfRangeException("count");
  60. return new TakeLastObservable<T>(source, count);
  61. }
  62. public static IObservable<T> TakeLast<T>(this IObservable<T> source, TimeSpan duration)
  63. {
  64. return TakeLast<T>(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
  65. }
  66. public static IObservable<T> TakeLast<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
  67. {
  68. if (source == null) throw new ArgumentNullException("source");
  69. return new TakeLastObservable<T>(source, duration, scheduler);
  70. }
  71. public static IObservable<T> Skip<T>(this IObservable<T> source, int count)
  72. {
  73. if (source == null) throw new ArgumentNullException("source");
  74. if (count < 0) throw new ArgumentOutOfRangeException("count");
  75. // optimize .Skip(count).Skip(count)
  76. var skip = source as SkipObservable<T>;
  77. if (skip != null && skip.scheduler == null)
  78. {
  79. return skip.Combine(count);
  80. }
  81. return new SkipObservable<T>(source, count);
  82. }
  83. public static IObservable<T> Skip<T>(this IObservable<T> source, TimeSpan duration)
  84. {
  85. return Skip(source, duration, Scheduler.DefaultSchedulers.TimeBasedOperations);
  86. }
  87. public static IObservable<T> Skip<T>(this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
  88. {
  89. if (source == null) throw new ArgumentNullException("source");
  90. if (scheduler == null) throw new ArgumentNullException("scheduler");
  91. // optimize .Skip(duration).Skip(duration)
  92. var skip = source as SkipObservable<T>;
  93. if (skip != null && skip.scheduler == scheduler)
  94. {
  95. return skip.Combine(duration);
  96. }
  97. return new SkipObservable<T>(source, duration, scheduler);
  98. }
  99. public static IObservable<T> SkipWhile<T>(this IObservable<T> source, Func<T, bool> predicate)
  100. {
  101. return new SkipWhileObservable<T>(source, predicate);
  102. }
  103. public static IObservable<T> SkipWhile<T>(this IObservable<T> source, Func<T, int, bool> predicate)
  104. {
  105. if (source == null) throw new ArgumentNullException("source");
  106. if (predicate == null) throw new ArgumentNullException("predicate");
  107. return new SkipWhileObservable<T>(source, predicate);
  108. }
  109. public static IObservable<T> SkipUntil<T, TOther>(this IObservable<T> source, IObservable<TOther> other)
  110. {
  111. return new SkipUntilObservable<T, TOther>(source, other);
  112. }
  113. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
  114. {
  115. if (source == null) throw new ArgumentNullException("source");
  116. if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
  117. return new BufferObservable<T>(source, count, 0);
  118. }
  119. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count, int skip)
  120. {
  121. if (source == null) throw new ArgumentNullException("source");
  122. if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
  123. if (skip <= 0) throw new ArgumentOutOfRangeException("skip <= 0");
  124. return new BufferObservable<T>(source, count, skip);
  125. }
  126. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
  127. {
  128. return Buffer(source, timeSpan, Scheduler.DefaultSchedulers.TimeBasedOperations);
  129. }
  130. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, IScheduler scheduler)
  131. {
  132. if (source == null) throw new ArgumentNullException("source");
  133. return new BufferObservable<T>(source, timeSpan, timeSpan, scheduler);
  134. }
  135. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, int count)
  136. {
  137. return Buffer(source, timeSpan, count, Scheduler.DefaultSchedulers.TimeBasedOperations);
  138. }
  139. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  140. {
  141. if (source == null) throw new ArgumentNullException("source");
  142. if (count <= 0) throw new ArgumentOutOfRangeException("count <= 0");
  143. return new BufferObservable<T>(source, timeSpan, count, scheduler);
  144. }
  145. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift)
  146. {
  147. return new BufferObservable<T>(source, timeSpan, timeShift, Scheduler.DefaultSchedulers.TimeBasedOperations);
  148. }
  149. public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  150. {
  151. if (source == null) throw new ArgumentNullException("source");
  152. return new BufferObservable<T>(source, timeSpan, timeShift, scheduler);
  153. }
  154. public static IObservable<IList<TSource>> Buffer<TSource, TWindowBoundary>(this IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
  155. {
  156. return new BufferObservable<TSource, TWindowBoundary>(source, windowBoundaries);
  157. }
  158. /// <summary>Projects old and new element of a sequence into a new form.</summary>
  159. public static IObservable<Pair<T>> Pairwise<T>(this IObservable<T> source)
  160. {
  161. return new PairwiseObservable<T>(source);
  162. }
  163. /// <summary>Projects old and new element of a sequence into a new form.</summary>
  164. public static IObservable<TR> Pairwise<T, TR>(this IObservable<T> source, Func<T, T, TR> selector)
  165. {
  166. return new PairwiseObservable<T, TR>(source, selector);
  167. }
  168. // first, last, single
  169. public static IObservable<T> Last<T>(this IObservable<T> source)
  170. {
  171. return new LastObservable<T>(source, false);
  172. }
  173. public static IObservable<T> Last<T>(this IObservable<T> source, Func<T, bool> predicate)
  174. {
  175. return new LastObservable<T>(source, predicate, false);
  176. }
  177. public static IObservable<T> LastOrDefault<T>(this IObservable<T> source)
  178. {
  179. return new LastObservable<T>(source, true);
  180. }
  181. public static IObservable<T> LastOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
  182. {
  183. return new LastObservable<T>(source, predicate, true);
  184. }
  185. public static IObservable<T> First<T>(this IObservable<T> source)
  186. {
  187. return new FirstObservable<T>(source, false);
  188. }
  189. public static IObservable<T> First<T>(this IObservable<T> source, Func<T, bool> predicate)
  190. {
  191. return new FirstObservable<T>(source, predicate, false);
  192. }
  193. public static IObservable<T> FirstOrDefault<T>(this IObservable<T> source)
  194. {
  195. return new FirstObservable<T>(source, true);
  196. }
  197. public static IObservable<T> FirstOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
  198. {
  199. return new FirstObservable<T>(source, predicate, true);
  200. }
  201. public static IObservable<T> Single<T>(this IObservable<T> source)
  202. {
  203. return new SingleObservable<T>(source, false);
  204. }
  205. public static IObservable<T> Single<T>(this IObservable<T> source, Func<T, bool> predicate)
  206. {
  207. return new SingleObservable<T>(source, predicate, false);
  208. }
  209. public static IObservable<T> SingleOrDefault<T>(this IObservable<T> source)
  210. {
  211. return new SingleObservable<T>(source, true);
  212. }
  213. public static IObservable<T> SingleOrDefault<T>(this IObservable<T> source, Func<T, bool> predicate)
  214. {
  215. return new SingleObservable<T>(source, predicate, true);
  216. }
  217. // Grouping
  218. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  219. {
  220. return GroupBy(source, keySelector, Stubs<TSource>.Identity);
  221. }
  222. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  223. {
  224. return GroupBy(source, keySelector, Stubs<TSource>.Identity, comparer);
  225. }
  226. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  227. {
  228. #if !UniRxLibrary
  229. var comparer = UnityEqualityComparer.GetDefault<TKey>();
  230. #else
  231. var comparer = EqualityComparer<TKey>.Default;
  232. #endif
  233. return GroupBy(source, keySelector, elementSelector, comparer);
  234. }
  235. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  236. {
  237. return new GroupByObservable<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
  238. }
  239. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  240. {
  241. return GroupBy(source, keySelector, Stubs<TSource>.Identity, capacity);
  242. }
  243. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  244. {
  245. return GroupBy(source, keySelector, Stubs<TSource>.Identity, capacity, comparer);
  246. }
  247. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  248. {
  249. #if !UniRxLibrary
  250. var comparer = UnityEqualityComparer.GetDefault<TKey>();
  251. #else
  252. var comparer = EqualityComparer<TKey>.Default;
  253. #endif
  254. return GroupBy(source, keySelector, elementSelector, capacity, comparer);
  255. }
  256. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  257. {
  258. return new GroupByObservable<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
  259. }
  260. }
  261. }