1
0

Buffer.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. using System;
  2. using System.Collections.Generic;
  3. using UniRx.Operators;
  4. namespace UniRx.Operators
  5. {
  6. internal class BufferObservable<T> : OperatorObservableBase<IList<T>>
  7. {
  8. readonly IObservable<T> source;
  9. readonly int count;
  10. readonly int skip;
  11. readonly TimeSpan timeSpan;
  12. readonly TimeSpan timeShift;
  13. readonly IScheduler scheduler;
  14. public BufferObservable(IObservable<T> source, int count, int skip)
  15. : base(source.IsRequiredSubscribeOnCurrentThread())
  16. {
  17. this.source = source;
  18. this.count = count;
  19. this.skip = skip;
  20. }
  21. public BufferObservable(IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)
  22. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  23. {
  24. this.source = source;
  25. this.timeSpan = timeSpan;
  26. this.timeShift = timeShift;
  27. this.scheduler = scheduler;
  28. }
  29. public BufferObservable(IObservable<T> source, TimeSpan timeSpan, int count, IScheduler scheduler)
  30. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  31. {
  32. this.source = source;
  33. this.timeSpan = timeSpan;
  34. this.count = count;
  35. this.scheduler = scheduler;
  36. }
  37. protected override IDisposable SubscribeCore(IObserver<IList<T>> observer, IDisposable cancel)
  38. {
  39. // count,skip
  40. if (scheduler == null)
  41. {
  42. if (skip == 0)
  43. {
  44. return new Buffer(this, observer, cancel).Run();
  45. }
  46. else
  47. {
  48. return new Buffer_(this, observer, cancel).Run();
  49. }
  50. }
  51. else
  52. {
  53. // time + count
  54. if (count > 0)
  55. {
  56. return new BufferTC(this, observer, cancel).Run();
  57. }
  58. else
  59. {
  60. if (timeSpan == timeShift)
  61. {
  62. return new BufferT(this, observer, cancel).Run();
  63. }
  64. else
  65. {
  66. return new BufferTS(this, observer, cancel).Run();
  67. }
  68. }
  69. }
  70. }
  71. // count only
  72. class Buffer : OperatorObserverBase<T, IList<T>>
  73. {
  74. readonly BufferObservable<T> parent;
  75. List<T> list;
  76. public Buffer(BufferObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  77. {
  78. this.parent = parent;
  79. }
  80. public IDisposable Run()
  81. {
  82. list = new List<T>(parent.count);
  83. return parent.source.Subscribe(this);
  84. }
  85. public override void OnNext(T value)
  86. {
  87. list.Add(value);
  88. if (list.Count == parent.count)
  89. {
  90. observer.OnNext(list);
  91. list = new List<T>(parent.count);
  92. }
  93. }
  94. public override void OnError(Exception error)
  95. {
  96. try { observer.OnError(error); } finally { Dispose(); }
  97. }
  98. public override void OnCompleted()
  99. {
  100. if (list.Count > 0)
  101. {
  102. observer.OnNext(list);
  103. }
  104. try { observer.OnCompleted(); } finally { Dispose(); }
  105. }
  106. }
  107. // count and skip
  108. class Buffer_ : OperatorObserverBase<T, IList<T>>
  109. {
  110. readonly BufferObservable<T> parent;
  111. Queue<List<T>> q;
  112. int index;
  113. public Buffer_(BufferObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  114. {
  115. this.parent = parent;
  116. }
  117. public IDisposable Run()
  118. {
  119. q = new Queue<List<T>>();
  120. index = -1;
  121. return parent.source.Subscribe(this);
  122. }
  123. public override void OnNext(T value)
  124. {
  125. index++;
  126. if (index % parent.skip == 0)
  127. {
  128. q.Enqueue(new List<T>(parent.count));
  129. }
  130. var len = q.Count;
  131. for (int i = 0; i < len; i++)
  132. {
  133. var list = q.Dequeue();
  134. list.Add(value);
  135. if (list.Count == parent.count)
  136. {
  137. observer.OnNext(list);
  138. }
  139. else
  140. {
  141. q.Enqueue(list);
  142. }
  143. }
  144. }
  145. public override void OnError(Exception error)
  146. {
  147. try { observer.OnError(error); } finally { Dispose(); }
  148. }
  149. public override void OnCompleted()
  150. {
  151. foreach (var list in q)
  152. {
  153. observer.OnNext(list);
  154. }
  155. try { observer.OnCompleted(); } finally { Dispose(); }
  156. }
  157. }
  158. // timespan = timeshift
  159. class BufferT : OperatorObserverBase<T, IList<T>>
  160. {
  161. static readonly T[] EmptyArray = new T[0];
  162. readonly BufferObservable<T> parent;
  163. readonly object gate = new object();
  164. List<T> list;
  165. public BufferT(BufferObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  166. {
  167. this.parent = parent;
  168. }
  169. public IDisposable Run()
  170. {
  171. list = new List<T>();
  172. var timerSubscription = Observable.Interval(parent.timeSpan, parent.scheduler)
  173. .Subscribe(new Buffer(this));
  174. var sourceSubscription = parent.source.Subscribe(this);
  175. return StableCompositeDisposable.Create(timerSubscription, sourceSubscription);
  176. }
  177. public override void OnNext(T value)
  178. {
  179. lock (gate)
  180. {
  181. list.Add(value);
  182. }
  183. }
  184. public override void OnError(Exception error)
  185. {
  186. try { observer.OnError(error); } finally { Dispose(); }
  187. }
  188. public override void OnCompleted()
  189. {
  190. List<T> currentList;
  191. lock (gate)
  192. {
  193. currentList = list;
  194. }
  195. observer.OnNext(currentList);
  196. try { observer.OnCompleted(); } finally { Dispose(); }
  197. }
  198. class Buffer : IObserver<long>
  199. {
  200. BufferT parent;
  201. public Buffer(BufferT parent)
  202. {
  203. this.parent = parent;
  204. }
  205. public void OnNext(long value)
  206. {
  207. var isZero = false;
  208. List<T> currentList;
  209. lock (parent.gate)
  210. {
  211. currentList = parent.list;
  212. if (currentList.Count != 0)
  213. {
  214. parent.list = new List<T>();
  215. }
  216. else
  217. {
  218. isZero = true;
  219. }
  220. }
  221. parent.observer.OnNext((isZero) ? (IList<T>)EmptyArray : currentList);
  222. }
  223. public void OnError(Exception error)
  224. {
  225. }
  226. public void OnCompleted()
  227. {
  228. }
  229. }
  230. }
  231. // timespan + timeshift
  232. class BufferTS : OperatorObserverBase<T, IList<T>>
  233. {
  234. readonly BufferObservable<T> parent;
  235. readonly object gate = new object();
  236. Queue<IList<T>> q;
  237. TimeSpan totalTime;
  238. TimeSpan nextShift;
  239. TimeSpan nextSpan;
  240. SerialDisposable timerD;
  241. public BufferTS(BufferObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  242. {
  243. this.parent = parent;
  244. }
  245. public IDisposable Run()
  246. {
  247. totalTime = TimeSpan.Zero;
  248. nextShift = parent.timeShift;
  249. nextSpan = parent.timeSpan;
  250. q = new Queue<IList<T>>();
  251. timerD = new SerialDisposable();
  252. q.Enqueue(new List<T>());
  253. CreateTimer();
  254. var subscription = parent.source.Subscribe(this);
  255. return StableCompositeDisposable.Create(subscription, timerD);
  256. }
  257. void CreateTimer()
  258. {
  259. var m = new SingleAssignmentDisposable();
  260. timerD.Disposable = m;
  261. var isSpan = false;
  262. var isShift = false;
  263. if (nextSpan == nextShift)
  264. {
  265. isSpan = true;
  266. isShift = true;
  267. }
  268. else if (nextSpan < nextShift)
  269. isSpan = true;
  270. else
  271. isShift = true;
  272. var newTotalTime = isSpan ? nextSpan : nextShift;
  273. var ts = newTotalTime - totalTime;
  274. totalTime = newTotalTime;
  275. if (isSpan)
  276. nextSpan += parent.timeShift;
  277. if (isShift)
  278. nextShift += parent.timeShift;
  279. m.Disposable = parent.scheduler.Schedule(ts, () =>
  280. {
  281. lock (gate)
  282. {
  283. if (isShift)
  284. {
  285. var s = new List<T>();
  286. q.Enqueue(s);
  287. }
  288. if (isSpan)
  289. {
  290. var s = q.Dequeue();
  291. observer.OnNext(s);
  292. }
  293. }
  294. CreateTimer();
  295. });
  296. }
  297. public override void OnNext(T value)
  298. {
  299. lock (gate)
  300. {
  301. foreach (var s in q)
  302. {
  303. s.Add(value);
  304. }
  305. }
  306. }
  307. public override void OnError(Exception error)
  308. {
  309. try { observer.OnError(error); } finally { Dispose(); }
  310. }
  311. public override void OnCompleted()
  312. {
  313. lock (gate)
  314. {
  315. foreach (var list in q)
  316. {
  317. observer.OnNext(list);
  318. }
  319. try { observer.OnCompleted(); } finally { Dispose(); }
  320. }
  321. }
  322. }
  323. // timespan + count
  324. class BufferTC : OperatorObserverBase<T, IList<T>>
  325. {
  326. static readonly T[] EmptyArray = new T[0]; // cache
  327. readonly BufferObservable<T> parent;
  328. readonly object gate = new object();
  329. List<T> list;
  330. long timerId;
  331. SerialDisposable timerD;
  332. public BufferTC(BufferObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  333. {
  334. this.parent = parent;
  335. }
  336. public IDisposable Run()
  337. {
  338. list = new List<T>();
  339. timerId = 0L;
  340. timerD = new SerialDisposable();
  341. CreateTimer();
  342. var subscription = parent.source.Subscribe(this);
  343. return StableCompositeDisposable.Create(subscription, timerD);
  344. }
  345. void CreateTimer()
  346. {
  347. var currentTimerId = timerId;
  348. var timerS = new SingleAssignmentDisposable();
  349. timerD.Disposable = timerS; // restart timer(dispose before)
  350. var periodicScheduler = parent.scheduler as ISchedulerPeriodic;
  351. if (periodicScheduler != null)
  352. {
  353. timerS.Disposable = periodicScheduler.SchedulePeriodic(parent.timeSpan, () => OnNextTick(currentTimerId));
  354. }
  355. else
  356. {
  357. timerS.Disposable = parent.scheduler.Schedule(parent.timeSpan, self => OnNextRecursive(currentTimerId, self));
  358. }
  359. }
  360. void OnNextTick(long currentTimerId)
  361. {
  362. var isZero = false;
  363. List<T> currentList;
  364. lock (gate)
  365. {
  366. if (currentTimerId != timerId) return;
  367. currentList = list;
  368. if (currentList.Count != 0)
  369. {
  370. list = new List<T>();
  371. }
  372. else
  373. {
  374. isZero = true;
  375. }
  376. }
  377. observer.OnNext((isZero) ? (IList<T>)EmptyArray : currentList);
  378. }
  379. void OnNextRecursive(long currentTimerId, Action<TimeSpan> self)
  380. {
  381. var isZero = false;
  382. List<T> currentList;
  383. lock (gate)
  384. {
  385. if (currentTimerId != timerId) return;
  386. currentList = list;
  387. if (currentList.Count != 0)
  388. {
  389. list = new List<T>();
  390. }
  391. else
  392. {
  393. isZero = true;
  394. }
  395. }
  396. observer.OnNext((isZero) ? (IList<T>)EmptyArray : currentList);
  397. self(parent.timeSpan);
  398. }
  399. public override void OnNext(T value)
  400. {
  401. List<T> currentList = null;
  402. lock (gate)
  403. {
  404. list.Add(value);
  405. if (list.Count == parent.count)
  406. {
  407. currentList = list;
  408. list = new List<T>();
  409. timerId++;
  410. CreateTimer();
  411. }
  412. }
  413. if (currentList != null)
  414. {
  415. observer.OnNext(currentList);
  416. }
  417. }
  418. public override void OnError(Exception error)
  419. {
  420. try { observer.OnError(error); } finally { Dispose(); }
  421. }
  422. public override void OnCompleted()
  423. {
  424. List<T> currentList;
  425. lock (gate)
  426. {
  427. timerId++;
  428. currentList = list;
  429. }
  430. observer.OnNext(currentList);
  431. try { observer.OnCompleted(); } finally { Dispose(); }
  432. }
  433. }
  434. }
  435. internal class BufferObservable<TSource, TWindowBoundary> : OperatorObservableBase<IList<TSource>>
  436. {
  437. readonly IObservable<TSource> source;
  438. readonly IObservable<TWindowBoundary> windowBoundaries;
  439. public BufferObservable(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)
  440. : base(source.IsRequiredSubscribeOnCurrentThread())
  441. {
  442. this.source = source;
  443. this.windowBoundaries = windowBoundaries;
  444. }
  445. protected override IDisposable SubscribeCore(IObserver<IList<TSource>> observer, IDisposable cancel)
  446. {
  447. return new Buffer(this, observer, cancel).Run();
  448. }
  449. class Buffer : OperatorObserverBase<TSource, IList<TSource>>
  450. {
  451. static readonly TSource[] EmptyArray = new TSource[0]; // cache
  452. readonly BufferObservable<TSource, TWindowBoundary> parent;
  453. object gate = new object();
  454. List<TSource> list;
  455. public Buffer(BufferObservable<TSource, TWindowBoundary> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel)
  456. {
  457. this.parent = parent;
  458. }
  459. public IDisposable Run()
  460. {
  461. list = new List<TSource>();
  462. var sourceSubscription = parent.source.Subscribe(this);
  463. var windowSubscription = parent.windowBoundaries.Subscribe(new Buffer_(this));
  464. return StableCompositeDisposable.Create(sourceSubscription, windowSubscription);
  465. }
  466. public override void OnNext(TSource value)
  467. {
  468. lock (gate)
  469. {
  470. list.Add(value);
  471. }
  472. }
  473. public override void OnError(Exception error)
  474. {
  475. lock (gate)
  476. {
  477. try { observer.OnError(error); } finally { Dispose(); }
  478. }
  479. }
  480. public override void OnCompleted()
  481. {
  482. lock (gate)
  483. {
  484. var currentList = list;
  485. list = new List<TSource>(); // safe
  486. observer.OnNext(currentList);
  487. try { observer.OnCompleted(); } finally { Dispose(); }
  488. }
  489. }
  490. class Buffer_ : IObserver<TWindowBoundary>
  491. {
  492. readonly Buffer parent;
  493. public Buffer_(Buffer parent)
  494. {
  495. this.parent = parent;
  496. }
  497. public void OnNext(TWindowBoundary value)
  498. {
  499. var isZero = false;
  500. List<TSource> currentList;
  501. lock (parent.gate)
  502. {
  503. currentList = parent.list;
  504. if (currentList.Count != 0)
  505. {
  506. parent.list = new List<TSource>();
  507. }
  508. else
  509. {
  510. isZero = true;
  511. }
  512. }
  513. if (isZero)
  514. {
  515. parent.observer.OnNext(EmptyArray);
  516. }
  517. else
  518. {
  519. parent.observer.OnNext(currentList);
  520. }
  521. }
  522. public void OnError(Exception error)
  523. {
  524. parent.OnError(error);
  525. }
  526. public void OnCompleted()
  527. {
  528. parent.OnCompleted();
  529. }
  530. }
  531. }
  532. }
  533. }