1
0

ZipLatest.cs 32 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. namespace UniRx.Operators
  6. {
  7. public delegate TR ZipLatestFunc<T1, T2, T3, TR>(T1 arg1, T2 arg2, T3 arg3);
  8. public delegate TR ZipLatestFunc<T1, T2, T3, T4, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4);
  9. public delegate TR ZipLatestFunc<T1, T2, T3, T4, T5, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5);
  10. public delegate TR ZipLatestFunc<T1, T2, T3, T4, T5, T6, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6);
  11. public delegate TR ZipLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7);
  12. // binary
  13. internal class ZipLatestObservable<TLeft, TRight, TResult> : OperatorObservableBase<TResult>
  14. {
  15. readonly IObservable<TLeft> left;
  16. readonly IObservable<TRight> right;
  17. readonly Func<TLeft, TRight, TResult> selector;
  18. public ZipLatestObservable(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TRight, TResult> selector)
  19. : base(left.IsRequiredSubscribeOnCurrentThread() || right.IsRequiredSubscribeOnCurrentThread())
  20. {
  21. this.left = left;
  22. this.right = right;
  23. this.selector = selector;
  24. }
  25. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  26. {
  27. return new ZipLatest(this, observer, cancel).Run();
  28. }
  29. class ZipLatest : OperatorObserverBase<TResult, TResult>
  30. {
  31. readonly ZipLatestObservable<TLeft, TRight, TResult> parent;
  32. readonly object gate = new object();
  33. TLeft leftValue = default(TLeft);
  34. bool leftStarted = false;
  35. bool leftCompleted = false;
  36. TRight rightValue = default(TRight);
  37. bool rightStarted = false;
  38. bool rightCompleted = false;
  39. public ZipLatest(ZipLatestObservable<TLeft, TRight, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel)
  40. {
  41. this.parent = parent;
  42. }
  43. public IDisposable Run()
  44. {
  45. var l = parent.left.Subscribe(new LeftObserver(this));
  46. var r = parent.right.Subscribe(new RightObserver(this));
  47. return StableCompositeDisposable.Create(l, r);
  48. }
  49. // publish in lock
  50. public void Publish()
  51. {
  52. if ((leftCompleted && !leftStarted) || (rightCompleted && !rightStarted))
  53. {
  54. try { observer.OnCompleted(); }
  55. finally { Dispose(); }
  56. return;
  57. }
  58. else if (!(leftStarted && rightStarted))
  59. {
  60. return;
  61. }
  62. TResult v;
  63. try
  64. {
  65. v = parent.selector(leftValue, rightValue);
  66. }
  67. catch (Exception ex)
  68. {
  69. try { observer.OnError(ex); }
  70. finally { Dispose(); }
  71. return;
  72. }
  73. OnNext(v);
  74. leftStarted = false;
  75. rightStarted = false;
  76. if (leftCompleted || rightCompleted)
  77. {
  78. try { observer.OnCompleted(); }
  79. finally { Dispose(); }
  80. return;
  81. }
  82. }
  83. public override void OnNext(TResult value)
  84. {
  85. base.observer.OnNext(value);
  86. }
  87. public override void OnError(Exception error)
  88. {
  89. try { observer.OnError(error); }
  90. finally { Dispose(); }
  91. }
  92. public override void OnCompleted()
  93. {
  94. try { observer.OnCompleted(); }
  95. finally { Dispose(); }
  96. }
  97. class LeftObserver : IObserver<TLeft>
  98. {
  99. readonly ZipLatest parent;
  100. public LeftObserver(ZipLatest parent)
  101. {
  102. this.parent = parent;
  103. }
  104. public void OnNext(TLeft value)
  105. {
  106. lock (parent.gate)
  107. {
  108. parent.leftStarted = true;
  109. parent.leftValue = value;
  110. parent.Publish();
  111. }
  112. }
  113. public void OnError(Exception error)
  114. {
  115. lock (parent.gate)
  116. {
  117. parent.OnError(error);
  118. }
  119. }
  120. public void OnCompleted()
  121. {
  122. lock (parent.gate)
  123. {
  124. parent.leftCompleted = true;
  125. if (parent.rightCompleted) parent.OnCompleted();
  126. }
  127. }
  128. }
  129. class RightObserver : IObserver<TRight>
  130. {
  131. readonly ZipLatest parent;
  132. public RightObserver(ZipLatest parent)
  133. {
  134. this.parent = parent;
  135. }
  136. public void OnNext(TRight value)
  137. {
  138. lock (parent.gate)
  139. {
  140. parent.rightStarted = true;
  141. parent.rightValue = value;
  142. parent.Publish();
  143. }
  144. }
  145. public void OnError(Exception error)
  146. {
  147. lock (parent.gate)
  148. {
  149. parent.OnError(error);
  150. }
  151. }
  152. public void OnCompleted()
  153. {
  154. lock (parent.gate)
  155. {
  156. parent.rightCompleted = true;
  157. if (parent.leftCompleted) parent.OnCompleted();
  158. }
  159. }
  160. }
  161. }
  162. }
  163. // array
  164. internal class ZipLatestObservable<T> : OperatorObservableBase<IList<T>>
  165. {
  166. readonly IObservable<T>[] sources;
  167. public ZipLatestObservable(IObservable<T>[] sources)
  168. : base(true)
  169. {
  170. this.sources = sources;
  171. }
  172. protected override IDisposable SubscribeCore(IObserver<IList<T>> observer, IDisposable cancel)
  173. {
  174. return new ZipLatest(this, observer, cancel).Run();
  175. }
  176. class ZipLatest : OperatorObserverBase<IList<T>, IList<T>>
  177. {
  178. readonly ZipLatestObservable<T> parent;
  179. readonly object gate = new object();
  180. int length;
  181. T[] values;
  182. bool[] isStarted;
  183. bool[] isCompleted;
  184. public ZipLatest(ZipLatestObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  185. {
  186. this.parent = parent;
  187. }
  188. public IDisposable Run()
  189. {
  190. length = parent.sources.Length;
  191. values = new T[length];
  192. isStarted = new bool[length];
  193. isCompleted = new bool[length];
  194. var disposables = new IDisposable[length];
  195. for (int i = 0; i < length; i++)
  196. {
  197. var source = parent.sources[i];
  198. disposables[i] = source.Subscribe(new ZipLatestObserver(this, i));
  199. }
  200. return StableCompositeDisposable.CreateUnsafe(disposables);
  201. }
  202. // publish is in the lock
  203. void Publish(int index)
  204. {
  205. isStarted[index] = true;
  206. var hasOnCompleted = false;
  207. var allValueStarted = true;
  208. for (int i = 0; i < length; i++)
  209. {
  210. if (!isStarted[i])
  211. {
  212. allValueStarted = false;
  213. break;
  214. }
  215. if (i == index) continue;
  216. if (isCompleted[i]) hasOnCompleted = true;
  217. }
  218. if (allValueStarted)
  219. {
  220. OnNext(new List<T>(values));
  221. if (hasOnCompleted)
  222. {
  223. try { observer.OnCompleted(); }
  224. finally { Dispose(); }
  225. return;
  226. }
  227. else
  228. {
  229. Array.Clear(isStarted, 0, length); // reset
  230. return;
  231. }
  232. }
  233. else
  234. {
  235. for (int i = 0; i < length; i++)
  236. {
  237. if (i == index) continue;
  238. if (isCompleted[i] && !isStarted[i])
  239. {
  240. try { observer.OnCompleted(); }
  241. finally { Dispose(); }
  242. return;
  243. }
  244. }
  245. }
  246. }
  247. public override void OnNext(IList<T> value)
  248. {
  249. base.observer.OnNext(value);
  250. }
  251. public override void OnError(Exception error)
  252. {
  253. try { observer.OnError(error); }
  254. finally { Dispose(); }
  255. }
  256. public override void OnCompleted()
  257. {
  258. try { observer.OnCompleted(); }
  259. finally { Dispose(); }
  260. }
  261. class ZipLatestObserver : IObserver<T>
  262. {
  263. readonly ZipLatest parent;
  264. readonly int index;
  265. public ZipLatestObserver(ZipLatest parent, int index)
  266. {
  267. this.parent = parent;
  268. this.index = index;
  269. }
  270. public void OnNext(T value)
  271. {
  272. lock (parent.gate)
  273. {
  274. parent.values[index] = value;
  275. parent.Publish(index);
  276. }
  277. }
  278. public void OnError(Exception ex)
  279. {
  280. lock (parent.gate)
  281. {
  282. parent.OnError(ex);
  283. }
  284. }
  285. public void OnCompleted()
  286. {
  287. lock (parent.gate)
  288. {
  289. parent.isCompleted[index] = true;
  290. var allTrue = true;
  291. for (int i = 0; i < parent.length; i++)
  292. {
  293. if (!parent.isCompleted[i])
  294. {
  295. allTrue = false;
  296. break;
  297. }
  298. }
  299. if (allTrue)
  300. {
  301. parent.OnCompleted();
  302. }
  303. }
  304. }
  305. }
  306. }
  307. }
  308. // generated from UniRx.Console.ZipLatestGenerator.tt
  309. #region NTH
  310. internal class ZipLatestObservable<T1, T2, T3, TR> : OperatorObservableBase<TR>
  311. {
  312. IObservable<T1> source1;
  313. IObservable<T2> source2;
  314. IObservable<T3> source3;
  315. ZipLatestFunc<T1, T2, T3, TR> resultSelector;
  316. public ZipLatestObservable(
  317. IObservable<T1> source1,
  318. IObservable<T2> source2,
  319. IObservable<T3> source3,
  320. ZipLatestFunc<T1, T2, T3, TR> resultSelector)
  321. : base(
  322. source1.IsRequiredSubscribeOnCurrentThread() ||
  323. source2.IsRequiredSubscribeOnCurrentThread() ||
  324. source3.IsRequiredSubscribeOnCurrentThread() ||
  325. false)
  326. {
  327. this.source1 = source1;
  328. this.source2 = source2;
  329. this.source3 = source3;
  330. this.resultSelector = resultSelector;
  331. }
  332. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  333. {
  334. return new ZipLatest(3, this, observer, cancel).Run();
  335. }
  336. class ZipLatest : NthZipLatestObserverBase<TR>
  337. {
  338. readonly ZipLatestObservable<T1, T2, T3, TR> parent;
  339. readonly object gate = new object();
  340. ZipLatestObserver<T1> c1;
  341. ZipLatestObserver<T2> c2;
  342. ZipLatestObserver<T3> c3;
  343. public ZipLatest(int length, ZipLatestObservable<T1, T2, T3, TR> parent, IObserver<TR> observer, IDisposable cancel)
  344. : base(length, observer, cancel)
  345. {
  346. this.parent = parent;
  347. }
  348. public IDisposable Run()
  349. {
  350. c1 = new ZipLatestObserver<T1>(gate, this, 0);
  351. c2 = new ZipLatestObserver<T2>(gate, this, 1);
  352. c3 = new ZipLatestObserver<T3>(gate, this, 2);
  353. var s1 = parent.source1.Subscribe(c1);
  354. var s2 = parent.source2.Subscribe(c2);
  355. var s3 = parent.source3.Subscribe(c3);
  356. return StableCompositeDisposable.Create(s1, s2, s3);
  357. }
  358. public override TR GetResult()
  359. {
  360. return parent.resultSelector(c1.Value, c2.Value, c3.Value);
  361. }
  362. public override void OnNext(TR value)
  363. {
  364. base.observer.OnNext(value);
  365. }
  366. public override void OnError(Exception error)
  367. {
  368. try { observer.OnError(error); }
  369. finally { Dispose(); }
  370. }
  371. public override void OnCompleted()
  372. {
  373. try { observer.OnCompleted(); }
  374. finally { Dispose(); }
  375. }
  376. }
  377. }
  378. internal class ZipLatestObservable<T1, T2, T3, T4, TR> : OperatorObservableBase<TR>
  379. {
  380. IObservable<T1> source1;
  381. IObservable<T2> source2;
  382. IObservable<T3> source3;
  383. IObservable<T4> source4;
  384. ZipLatestFunc<T1, T2, T3, T4, TR> resultSelector;
  385. public ZipLatestObservable(
  386. IObservable<T1> source1,
  387. IObservable<T2> source2,
  388. IObservable<T3> source3,
  389. IObservable<T4> source4,
  390. ZipLatestFunc<T1, T2, T3, T4, TR> resultSelector)
  391. : base(
  392. source1.IsRequiredSubscribeOnCurrentThread() ||
  393. source2.IsRequiredSubscribeOnCurrentThread() ||
  394. source3.IsRequiredSubscribeOnCurrentThread() ||
  395. source4.IsRequiredSubscribeOnCurrentThread() ||
  396. false)
  397. {
  398. this.source1 = source1;
  399. this.source2 = source2;
  400. this.source3 = source3;
  401. this.source4 = source4;
  402. this.resultSelector = resultSelector;
  403. }
  404. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  405. {
  406. return new ZipLatest(4, this, observer, cancel).Run();
  407. }
  408. class ZipLatest : NthZipLatestObserverBase<TR>
  409. {
  410. readonly ZipLatestObservable<T1, T2, T3, T4, TR> parent;
  411. readonly object gate = new object();
  412. ZipLatestObserver<T1> c1;
  413. ZipLatestObserver<T2> c2;
  414. ZipLatestObserver<T3> c3;
  415. ZipLatestObserver<T4> c4;
  416. public ZipLatest(int length, ZipLatestObservable<T1, T2, T3, T4, TR> parent, IObserver<TR> observer, IDisposable cancel)
  417. : base(length, observer, cancel)
  418. {
  419. this.parent = parent;
  420. }
  421. public IDisposable Run()
  422. {
  423. c1 = new ZipLatestObserver<T1>(gate, this, 0);
  424. c2 = new ZipLatestObserver<T2>(gate, this, 1);
  425. c3 = new ZipLatestObserver<T3>(gate, this, 2);
  426. c4 = new ZipLatestObserver<T4>(gate, this, 3);
  427. var s1 = parent.source1.Subscribe(c1);
  428. var s2 = parent.source2.Subscribe(c2);
  429. var s3 = parent.source3.Subscribe(c3);
  430. var s4 = parent.source4.Subscribe(c4);
  431. return StableCompositeDisposable.Create(s1, s2, s3, s4);
  432. }
  433. public override TR GetResult()
  434. {
  435. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value);
  436. }
  437. public override void OnNext(TR value)
  438. {
  439. base.observer.OnNext(value);
  440. }
  441. public override void OnError(Exception error)
  442. {
  443. try { observer.OnError(error); }
  444. finally { Dispose(); }
  445. }
  446. public override void OnCompleted()
  447. {
  448. try { observer.OnCompleted(); }
  449. finally { Dispose(); }
  450. }
  451. }
  452. }
  453. internal class ZipLatestObservable<T1, T2, T3, T4, T5, TR> : OperatorObservableBase<TR>
  454. {
  455. IObservable<T1> source1;
  456. IObservable<T2> source2;
  457. IObservable<T3> source3;
  458. IObservable<T4> source4;
  459. IObservable<T5> source5;
  460. ZipLatestFunc<T1, T2, T3, T4, T5, TR> resultSelector;
  461. public ZipLatestObservable(
  462. IObservable<T1> source1,
  463. IObservable<T2> source2,
  464. IObservable<T3> source3,
  465. IObservable<T4> source4,
  466. IObservable<T5> source5,
  467. ZipLatestFunc<T1, T2, T3, T4, T5, TR> resultSelector)
  468. : base(
  469. source1.IsRequiredSubscribeOnCurrentThread() ||
  470. source2.IsRequiredSubscribeOnCurrentThread() ||
  471. source3.IsRequiredSubscribeOnCurrentThread() ||
  472. source4.IsRequiredSubscribeOnCurrentThread() ||
  473. source5.IsRequiredSubscribeOnCurrentThread() ||
  474. false)
  475. {
  476. this.source1 = source1;
  477. this.source2 = source2;
  478. this.source3 = source3;
  479. this.source4 = source4;
  480. this.source5 = source5;
  481. this.resultSelector = resultSelector;
  482. }
  483. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  484. {
  485. return new ZipLatest(5, this, observer, cancel).Run();
  486. }
  487. class ZipLatest : NthZipLatestObserverBase<TR>
  488. {
  489. readonly ZipLatestObservable<T1, T2, T3, T4, T5, TR> parent;
  490. readonly object gate = new object();
  491. ZipLatestObserver<T1> c1;
  492. ZipLatestObserver<T2> c2;
  493. ZipLatestObserver<T3> c3;
  494. ZipLatestObserver<T4> c4;
  495. ZipLatestObserver<T5> c5;
  496. public ZipLatest(int length, ZipLatestObservable<T1, T2, T3, T4, T5, TR> parent, IObserver<TR> observer, IDisposable cancel)
  497. : base(length, observer, cancel)
  498. {
  499. this.parent = parent;
  500. }
  501. public IDisposable Run()
  502. {
  503. c1 = new ZipLatestObserver<T1>(gate, this, 0);
  504. c2 = new ZipLatestObserver<T2>(gate, this, 1);
  505. c3 = new ZipLatestObserver<T3>(gate, this, 2);
  506. c4 = new ZipLatestObserver<T4>(gate, this, 3);
  507. c5 = new ZipLatestObserver<T5>(gate, this, 4);
  508. var s1 = parent.source1.Subscribe(c1);
  509. var s2 = parent.source2.Subscribe(c2);
  510. var s3 = parent.source3.Subscribe(c3);
  511. var s4 = parent.source4.Subscribe(c4);
  512. var s5 = parent.source5.Subscribe(c5);
  513. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5);
  514. }
  515. public override TR GetResult()
  516. {
  517. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value);
  518. }
  519. public override void OnNext(TR value)
  520. {
  521. base.observer.OnNext(value);
  522. }
  523. public override void OnError(Exception error)
  524. {
  525. try { observer.OnError(error); }
  526. finally { Dispose(); }
  527. }
  528. public override void OnCompleted()
  529. {
  530. try { observer.OnCompleted(); }
  531. finally { Dispose(); }
  532. }
  533. }
  534. }
  535. internal class ZipLatestObservable<T1, T2, T3, T4, T5, T6, TR> : OperatorObservableBase<TR>
  536. {
  537. IObservable<T1> source1;
  538. IObservable<T2> source2;
  539. IObservable<T3> source3;
  540. IObservable<T4> source4;
  541. IObservable<T5> source5;
  542. IObservable<T6> source6;
  543. ZipLatestFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector;
  544. public ZipLatestObservable(
  545. IObservable<T1> source1,
  546. IObservable<T2> source2,
  547. IObservable<T3> source3,
  548. IObservable<T4> source4,
  549. IObservable<T5> source5,
  550. IObservable<T6> source6,
  551. ZipLatestFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector)
  552. : base(
  553. source1.IsRequiredSubscribeOnCurrentThread() ||
  554. source2.IsRequiredSubscribeOnCurrentThread() ||
  555. source3.IsRequiredSubscribeOnCurrentThread() ||
  556. source4.IsRequiredSubscribeOnCurrentThread() ||
  557. source5.IsRequiredSubscribeOnCurrentThread() ||
  558. source6.IsRequiredSubscribeOnCurrentThread() ||
  559. false)
  560. {
  561. this.source1 = source1;
  562. this.source2 = source2;
  563. this.source3 = source3;
  564. this.source4 = source4;
  565. this.source5 = source5;
  566. this.source6 = source6;
  567. this.resultSelector = resultSelector;
  568. }
  569. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  570. {
  571. return new ZipLatest(6, this, observer, cancel).Run();
  572. }
  573. class ZipLatest : NthZipLatestObserverBase<TR>
  574. {
  575. readonly ZipLatestObservable<T1, T2, T3, T4, T5, T6, TR> parent;
  576. readonly object gate = new object();
  577. ZipLatestObserver<T1> c1;
  578. ZipLatestObserver<T2> c2;
  579. ZipLatestObserver<T3> c3;
  580. ZipLatestObserver<T4> c4;
  581. ZipLatestObserver<T5> c5;
  582. ZipLatestObserver<T6> c6;
  583. public ZipLatest(int length, ZipLatestObservable<T1, T2, T3, T4, T5, T6, TR> parent, IObserver<TR> observer, IDisposable cancel)
  584. : base(length, observer, cancel)
  585. {
  586. this.parent = parent;
  587. }
  588. public IDisposable Run()
  589. {
  590. c1 = new ZipLatestObserver<T1>(gate, this, 0);
  591. c2 = new ZipLatestObserver<T2>(gate, this, 1);
  592. c3 = new ZipLatestObserver<T3>(gate, this, 2);
  593. c4 = new ZipLatestObserver<T4>(gate, this, 3);
  594. c5 = new ZipLatestObserver<T5>(gate, this, 4);
  595. c6 = new ZipLatestObserver<T6>(gate, this, 5);
  596. var s1 = parent.source1.Subscribe(c1);
  597. var s2 = parent.source2.Subscribe(c2);
  598. var s3 = parent.source3.Subscribe(c3);
  599. var s4 = parent.source4.Subscribe(c4);
  600. var s5 = parent.source5.Subscribe(c5);
  601. var s6 = parent.source6.Subscribe(c6);
  602. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6);
  603. }
  604. public override TR GetResult()
  605. {
  606. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value, c6.Value);
  607. }
  608. public override void OnNext(TR value)
  609. {
  610. base.observer.OnNext(value);
  611. }
  612. public override void OnError(Exception error)
  613. {
  614. try { observer.OnError(error); }
  615. finally { Dispose(); }
  616. }
  617. public override void OnCompleted()
  618. {
  619. try { observer.OnCompleted(); }
  620. finally { Dispose(); }
  621. }
  622. }
  623. }
  624. internal class ZipLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> : OperatorObservableBase<TR>
  625. {
  626. IObservable<T1> source1;
  627. IObservable<T2> source2;
  628. IObservable<T3> source3;
  629. IObservable<T4> source4;
  630. IObservable<T5> source5;
  631. IObservable<T6> source6;
  632. IObservable<T7> source7;
  633. ZipLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector;
  634. public ZipLatestObservable(
  635. IObservable<T1> source1,
  636. IObservable<T2> source2,
  637. IObservable<T3> source3,
  638. IObservable<T4> source4,
  639. IObservable<T5> source5,
  640. IObservable<T6> source6,
  641. IObservable<T7> source7,
  642. ZipLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector)
  643. : base(
  644. source1.IsRequiredSubscribeOnCurrentThread() ||
  645. source2.IsRequiredSubscribeOnCurrentThread() ||
  646. source3.IsRequiredSubscribeOnCurrentThread() ||
  647. source4.IsRequiredSubscribeOnCurrentThread() ||
  648. source5.IsRequiredSubscribeOnCurrentThread() ||
  649. source6.IsRequiredSubscribeOnCurrentThread() ||
  650. source7.IsRequiredSubscribeOnCurrentThread() ||
  651. false)
  652. {
  653. this.source1 = source1;
  654. this.source2 = source2;
  655. this.source3 = source3;
  656. this.source4 = source4;
  657. this.source5 = source5;
  658. this.source6 = source6;
  659. this.source7 = source7;
  660. this.resultSelector = resultSelector;
  661. }
  662. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  663. {
  664. return new ZipLatest(7, this, observer, cancel).Run();
  665. }
  666. class ZipLatest : NthZipLatestObserverBase<TR>
  667. {
  668. readonly ZipLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent;
  669. readonly object gate = new object();
  670. ZipLatestObserver<T1> c1;
  671. ZipLatestObserver<T2> c2;
  672. ZipLatestObserver<T3> c3;
  673. ZipLatestObserver<T4> c4;
  674. ZipLatestObserver<T5> c5;
  675. ZipLatestObserver<T6> c6;
  676. ZipLatestObserver<T7> c7;
  677. public ZipLatest(int length, ZipLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent, IObserver<TR> observer, IDisposable cancel)
  678. : base(length, observer, cancel)
  679. {
  680. this.parent = parent;
  681. }
  682. public IDisposable Run()
  683. {
  684. c1 = new ZipLatestObserver<T1>(gate, this, 0);
  685. c2 = new ZipLatestObserver<T2>(gate, this, 1);
  686. c3 = new ZipLatestObserver<T3>(gate, this, 2);
  687. c4 = new ZipLatestObserver<T4>(gate, this, 3);
  688. c5 = new ZipLatestObserver<T5>(gate, this, 4);
  689. c6 = new ZipLatestObserver<T6>(gate, this, 5);
  690. c7 = new ZipLatestObserver<T7>(gate, this, 6);
  691. var s1 = parent.source1.Subscribe(c1);
  692. var s2 = parent.source2.Subscribe(c2);
  693. var s3 = parent.source3.Subscribe(c3);
  694. var s4 = parent.source4.Subscribe(c4);
  695. var s5 = parent.source5.Subscribe(c5);
  696. var s6 = parent.source6.Subscribe(c6);
  697. var s7 = parent.source7.Subscribe(c7);
  698. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6, s7);
  699. }
  700. public override TR GetResult()
  701. {
  702. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value, c6.Value, c7.Value);
  703. }
  704. public override void OnNext(TR value)
  705. {
  706. base.observer.OnNext(value);
  707. }
  708. public override void OnError(Exception error)
  709. {
  710. try { observer.OnError(error); }
  711. finally { Dispose(); }
  712. }
  713. public override void OnCompleted()
  714. {
  715. try { observer.OnCompleted(); }
  716. finally { Dispose(); }
  717. }
  718. }
  719. }
  720. #endregion
  721. // Nth infrastructure
  722. internal interface IZipLatestObservable
  723. {
  724. void Publish(int index);
  725. void Fail(Exception error);
  726. void Done(int index);
  727. }
  728. internal abstract class NthZipLatestObserverBase<T> : OperatorObserverBase<T, T>, IZipLatestObservable
  729. {
  730. readonly int length;
  731. readonly bool[] isStarted;
  732. readonly bool[] isCompleted;
  733. public NthZipLatestObserverBase(int length, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  734. {
  735. this.length = length;
  736. this.isStarted = new bool[length];
  737. this.isCompleted = new bool[length];
  738. }
  739. public abstract T GetResult();
  740. // operators in lock
  741. public void Publish(int index)
  742. {
  743. isStarted[index] = true;
  744. var hasOnCompleted = false;
  745. var allValueStarted = true;
  746. for (int i = 0; i < length; i++)
  747. {
  748. if (!isStarted[i])
  749. {
  750. allValueStarted = false;
  751. break;
  752. }
  753. if (i == index) continue;
  754. if (isCompleted[i]) hasOnCompleted = true;
  755. }
  756. if (allValueStarted)
  757. {
  758. var result = default(T);
  759. try
  760. {
  761. result = GetResult();
  762. }
  763. catch (Exception ex)
  764. {
  765. try { observer.OnError(ex); }
  766. finally { Dispose(); }
  767. return;
  768. }
  769. OnNext(result);
  770. if (hasOnCompleted)
  771. {
  772. try { observer.OnCompleted(); }
  773. finally { Dispose(); }
  774. return;
  775. }
  776. else
  777. {
  778. Array.Clear(isStarted, 0, length); // reset
  779. return;
  780. }
  781. }
  782. else
  783. {
  784. for (int i = 0; i < length; i++)
  785. {
  786. if (i == index) continue;
  787. if (isCompleted[i] && !isStarted[i])
  788. {
  789. try { observer.OnCompleted(); }
  790. finally { Dispose(); }
  791. return;
  792. }
  793. }
  794. }
  795. }
  796. public void Done(int index)
  797. {
  798. isCompleted[index] = true;
  799. var allTrue = true;
  800. for (int i = 0; i < length; i++)
  801. {
  802. if (!isCompleted[i])
  803. {
  804. allTrue = false;
  805. break;
  806. }
  807. }
  808. if (allTrue)
  809. {
  810. try { observer.OnCompleted(); }
  811. finally { Dispose(); }
  812. }
  813. }
  814. public void Fail(Exception error)
  815. {
  816. try { observer.OnError(error); }
  817. finally { Dispose(); }
  818. }
  819. }
  820. // Nth
  821. internal class ZipLatestObserver<T> : IObserver<T>
  822. {
  823. readonly object gate;
  824. readonly IZipLatestObservable parent;
  825. readonly int index;
  826. T value;
  827. public T Value { get { return value; } }
  828. public ZipLatestObserver(object gate, IZipLatestObservable parent, int index)
  829. {
  830. this.gate = gate;
  831. this.parent = parent;
  832. this.index = index;
  833. }
  834. public void OnNext(T value)
  835. {
  836. lock (gate)
  837. {
  838. this.value = value;
  839. parent.Publish(index);
  840. }
  841. }
  842. public void OnError(Exception error)
  843. {
  844. lock (gate)
  845. {
  846. parent.Fail(error);
  847. }
  848. }
  849. public void OnCompleted()
  850. {
  851. lock (gate)
  852. {
  853. parent.Done(index);
  854. }
  855. }
  856. }
  857. }