CombineLatest.cs 33 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. namespace UniRx.Operators
  6. {
  7. public delegate TR CombineLatestFunc<T1, T2, T3, TR>(T1 arg1, T2 arg2, T3 arg3);
  8. public delegate TR CombineLatestFunc<T1, T2, T3, T4, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4);
  9. public delegate TR CombineLatestFunc<T1, T2, T3, T4, T5, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5);
  10. public delegate TR CombineLatestFunc<T1, T2, T3, T4, T5, T6, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6);
  11. public delegate TR CombineLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7);
  12. // binary
  13. internal class CombineLatestObservable<TLeft, TRight, TResult> : OperatorObservableBase<TResult>
  14. {
  15. readonly IObservable<TLeft> left;
  16. readonly IObservable<TRight> right;
  17. readonly Func<TLeft, TRight, TResult> selector;
  18. public CombineLatestObservable(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TRight, TResult> selector)
  19. : base(left.IsRequiredSubscribeOnCurrentThread() || right.IsRequiredSubscribeOnCurrentThread())
  20. {
  21. this.left = left;
  22. this.right = right;
  23. this.selector = selector;
  24. }
  25. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  26. {
  27. return new CombineLatest(this, observer, cancel).Run();
  28. }
  29. class CombineLatest : OperatorObserverBase<TResult, TResult>
  30. {
  31. readonly CombineLatestObservable<TLeft, TRight, TResult> parent;
  32. readonly object gate = new object();
  33. TLeft leftValue = default(TLeft);
  34. bool leftStarted = false;
  35. bool leftCompleted = false;
  36. TRight rightValue = default(TRight);
  37. bool rightStarted = false;
  38. bool rightCompleted = false;
  39. public CombineLatest(CombineLatestObservable<TLeft, TRight, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel)
  40. {
  41. this.parent = parent;
  42. }
  43. public IDisposable Run()
  44. {
  45. var l = parent.left.Subscribe(new LeftObserver(this));
  46. var r = parent.right.Subscribe(new RightObserver(this));
  47. return StableCompositeDisposable.Create(l, r);
  48. }
  49. // publish in lock
  50. public void Publish()
  51. {
  52. if ((leftCompleted && !leftStarted) || (rightCompleted && !rightStarted))
  53. {
  54. try { observer.OnCompleted(); }
  55. finally { Dispose(); }
  56. return;
  57. }
  58. else if (!(leftStarted && rightStarted))
  59. {
  60. return;
  61. }
  62. TResult v;
  63. try
  64. {
  65. v = parent.selector(leftValue, rightValue);
  66. }
  67. catch (Exception ex)
  68. {
  69. try { observer.OnError(ex); }
  70. finally { Dispose(); }
  71. return;
  72. }
  73. OnNext(v);
  74. }
  75. public override void OnNext(TResult value)
  76. {
  77. base.observer.OnNext(value);
  78. }
  79. public override void OnError(Exception error)
  80. {
  81. try { observer.OnError(error); }
  82. finally { Dispose(); }
  83. }
  84. public override void OnCompleted()
  85. {
  86. try { observer.OnCompleted(); }
  87. finally { Dispose(); }
  88. }
  89. class LeftObserver : IObserver<TLeft>
  90. {
  91. readonly CombineLatest parent;
  92. public LeftObserver(CombineLatest parent)
  93. {
  94. this.parent = parent;
  95. }
  96. public void OnNext(TLeft value)
  97. {
  98. lock (parent.gate)
  99. {
  100. parent.leftStarted = true;
  101. parent.leftValue = value;
  102. parent.Publish();
  103. }
  104. }
  105. public void OnError(Exception error)
  106. {
  107. lock (parent.gate)
  108. {
  109. parent.OnError(error);
  110. }
  111. }
  112. public void OnCompleted()
  113. {
  114. lock (parent.gate)
  115. {
  116. parent.leftCompleted = true;
  117. if (parent.rightCompleted) parent.OnCompleted();
  118. }
  119. }
  120. }
  121. class RightObserver : IObserver<TRight>
  122. {
  123. readonly CombineLatest parent;
  124. public RightObserver(CombineLatest parent)
  125. {
  126. this.parent = parent;
  127. }
  128. public void OnNext(TRight value)
  129. {
  130. lock (parent.gate)
  131. {
  132. parent.rightStarted = true;
  133. parent.rightValue = value;
  134. parent.Publish();
  135. }
  136. }
  137. public void OnError(Exception error)
  138. {
  139. lock (parent.gate)
  140. {
  141. parent.OnError(error);
  142. }
  143. }
  144. public void OnCompleted()
  145. {
  146. lock (parent.gate)
  147. {
  148. parent.rightCompleted = true;
  149. if (parent.leftCompleted) parent.OnCompleted();
  150. }
  151. }
  152. }
  153. }
  154. }
  155. // array
  156. internal class CombineLatestObservable<T> : OperatorObservableBase<IList<T>>
  157. {
  158. readonly IObservable<T>[] sources;
  159. public CombineLatestObservable(IObservable<T>[] sources)
  160. : base(true)
  161. {
  162. this.sources = sources;
  163. }
  164. protected override IDisposable SubscribeCore(IObserver<IList<T>> observer, IDisposable cancel)
  165. {
  166. return new CombineLatest(this, observer, cancel).Run();
  167. }
  168. class CombineLatest : OperatorObserverBase<IList<T>, IList<T>>
  169. {
  170. readonly CombineLatestObservable<T> parent;
  171. readonly object gate = new object();
  172. int length;
  173. T[] values;
  174. bool[] isStarted;
  175. bool[] isCompleted;
  176. bool isAllValueStarted;
  177. public CombineLatest(CombineLatestObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel)
  178. {
  179. this.parent = parent;
  180. }
  181. public IDisposable Run()
  182. {
  183. length = parent.sources.Length;
  184. values = new T[length];
  185. isStarted = new bool[length];
  186. isCompleted = new bool[length];
  187. isAllValueStarted = false;
  188. var disposables = new IDisposable[length];
  189. for (int i = 0; i < length; i++)
  190. {
  191. var source = parent.sources[i];
  192. disposables[i] = source.Subscribe(new CombineLatestObserver(this, i));
  193. }
  194. return StableCompositeDisposable.CreateUnsafe(disposables);
  195. }
  196. // publish is in the lock
  197. void Publish(int index)
  198. {
  199. isStarted[index] = true;
  200. if (isAllValueStarted)
  201. {
  202. OnNext(new List<T>(values));
  203. return;
  204. }
  205. var allValueStarted = true;
  206. for (int i = 0; i < length; i++)
  207. {
  208. if (!isStarted[i])
  209. {
  210. allValueStarted = false;
  211. break;
  212. }
  213. }
  214. isAllValueStarted = allValueStarted;
  215. if (isAllValueStarted)
  216. {
  217. OnNext(new List<T>(values));
  218. return;
  219. }
  220. else
  221. {
  222. var allCompletedWithoutSelf = true;
  223. for (int i = 0; i < length; i++)
  224. {
  225. if (i == index) continue;
  226. if (!isCompleted[i])
  227. {
  228. allCompletedWithoutSelf = false;
  229. break;
  230. }
  231. }
  232. if (allCompletedWithoutSelf)
  233. {
  234. try { observer.OnCompleted(); }
  235. finally { Dispose(); }
  236. return;
  237. }
  238. else
  239. {
  240. return;
  241. }
  242. }
  243. }
  244. public override void OnNext(IList<T> value)
  245. {
  246. base.observer.OnNext(value);
  247. }
  248. public override void OnError(Exception error)
  249. {
  250. try { observer.OnError(error); }
  251. finally { Dispose(); }
  252. }
  253. public override void OnCompleted()
  254. {
  255. try { observer.OnCompleted(); }
  256. finally { Dispose(); }
  257. }
  258. class CombineLatestObserver : IObserver<T>
  259. {
  260. readonly CombineLatest parent;
  261. readonly int index;
  262. public CombineLatestObserver(CombineLatest parent, int index)
  263. {
  264. this.parent = parent;
  265. this.index = index;
  266. }
  267. public void OnNext(T value)
  268. {
  269. lock (parent.gate)
  270. {
  271. parent.values[index] = value;
  272. parent.Publish(index);
  273. }
  274. }
  275. public void OnError(Exception ex)
  276. {
  277. lock (parent.gate)
  278. {
  279. parent.OnError(ex);
  280. }
  281. }
  282. public void OnCompleted()
  283. {
  284. lock (parent.gate)
  285. {
  286. parent.isCompleted[index] = true;
  287. var allTrue = true;
  288. for (int i = 0; i < parent.length; i++)
  289. {
  290. if (!parent.isCompleted[i])
  291. {
  292. allTrue = false;
  293. break;
  294. }
  295. }
  296. if (allTrue)
  297. {
  298. parent.OnCompleted();
  299. }
  300. }
  301. }
  302. }
  303. }
  304. }
  305. // generated from UniRx.Console.CombineLatestGenerator.tt
  306. #region NTH
  307. internal class CombineLatestObservable<T1, T2, T3, TR> : OperatorObservableBase<TR>
  308. {
  309. IObservable<T1> source1;
  310. IObservable<T2> source2;
  311. IObservable<T3> source3;
  312. CombineLatestFunc<T1, T2, T3, TR> resultSelector;
  313. public CombineLatestObservable(
  314. IObservable<T1> source1,
  315. IObservable<T2> source2,
  316. IObservable<T3> source3,
  317. CombineLatestFunc<T1, T2, T3, TR> resultSelector)
  318. : base(
  319. source1.IsRequiredSubscribeOnCurrentThread() ||
  320. source2.IsRequiredSubscribeOnCurrentThread() ||
  321. source3.IsRequiredSubscribeOnCurrentThread() ||
  322. false)
  323. {
  324. this.source1 = source1;
  325. this.source2 = source2;
  326. this.source3 = source3;
  327. this.resultSelector = resultSelector;
  328. }
  329. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  330. {
  331. return new CombineLatest(3, this, observer, cancel).Run();
  332. }
  333. class CombineLatest : NthCombineLatestObserverBase<TR>
  334. {
  335. readonly CombineLatestObservable<T1, T2, T3, TR> parent;
  336. readonly object gate = new object();
  337. CombineLatestObserver<T1> c1;
  338. CombineLatestObserver<T2> c2;
  339. CombineLatestObserver<T3> c3;
  340. public CombineLatest(int length, CombineLatestObservable<T1, T2, T3, TR> parent, IObserver<TR> observer, IDisposable cancel)
  341. : base(length, observer, cancel)
  342. {
  343. this.parent = parent;
  344. }
  345. public IDisposable Run()
  346. {
  347. c1 = new CombineLatestObserver<T1>(gate, this, 0);
  348. c2 = new CombineLatestObserver<T2>(gate, this, 1);
  349. c3 = new CombineLatestObserver<T3>(gate, this, 2);
  350. var s1 = parent.source1.Subscribe(c1);
  351. var s2 = parent.source2.Subscribe(c2);
  352. var s3 = parent.source3.Subscribe(c3);
  353. return StableCompositeDisposable.Create(s1, s2, s3);
  354. }
  355. public override TR GetResult()
  356. {
  357. return parent.resultSelector(c1.Value, c2.Value, c3.Value);
  358. }
  359. public override void OnNext(TR value)
  360. {
  361. base.observer.OnNext(value);
  362. }
  363. public override void OnError(Exception error)
  364. {
  365. try { observer.OnError(error); }
  366. finally { Dispose(); }
  367. }
  368. public override void OnCompleted()
  369. {
  370. try { observer.OnCompleted(); }
  371. finally { Dispose(); }
  372. }
  373. }
  374. }
  375. internal class CombineLatestObservable<T1, T2, T3, T4, TR> : OperatorObservableBase<TR>
  376. {
  377. IObservable<T1> source1;
  378. IObservable<T2> source2;
  379. IObservable<T3> source3;
  380. IObservable<T4> source4;
  381. CombineLatestFunc<T1, T2, T3, T4, TR> resultSelector;
  382. public CombineLatestObservable(
  383. IObservable<T1> source1,
  384. IObservable<T2> source2,
  385. IObservable<T3> source3,
  386. IObservable<T4> source4,
  387. CombineLatestFunc<T1, T2, T3, T4, TR> resultSelector)
  388. : base(
  389. source1.IsRequiredSubscribeOnCurrentThread() ||
  390. source2.IsRequiredSubscribeOnCurrentThread() ||
  391. source3.IsRequiredSubscribeOnCurrentThread() ||
  392. source4.IsRequiredSubscribeOnCurrentThread() ||
  393. false)
  394. {
  395. this.source1 = source1;
  396. this.source2 = source2;
  397. this.source3 = source3;
  398. this.source4 = source4;
  399. this.resultSelector = resultSelector;
  400. }
  401. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  402. {
  403. return new CombineLatest(4, this, observer, cancel).Run();
  404. }
  405. class CombineLatest : NthCombineLatestObserverBase<TR>
  406. {
  407. readonly CombineLatestObservable<T1, T2, T3, T4, TR> parent;
  408. readonly object gate = new object();
  409. CombineLatestObserver<T1> c1;
  410. CombineLatestObserver<T2> c2;
  411. CombineLatestObserver<T3> c3;
  412. CombineLatestObserver<T4> c4;
  413. public CombineLatest(int length, CombineLatestObservable<T1, T2, T3, T4, TR> parent, IObserver<TR> observer, IDisposable cancel)
  414. : base(length, observer, cancel)
  415. {
  416. this.parent = parent;
  417. }
  418. public IDisposable Run()
  419. {
  420. c1 = new CombineLatestObserver<T1>(gate, this, 0);
  421. c2 = new CombineLatestObserver<T2>(gate, this, 1);
  422. c3 = new CombineLatestObserver<T3>(gate, this, 2);
  423. c4 = new CombineLatestObserver<T4>(gate, this, 3);
  424. var s1 = parent.source1.Subscribe(c1);
  425. var s2 = parent.source2.Subscribe(c2);
  426. var s3 = parent.source3.Subscribe(c3);
  427. var s4 = parent.source4.Subscribe(c4);
  428. return StableCompositeDisposable.Create(s1, s2, s3, s4);
  429. }
  430. public override TR GetResult()
  431. {
  432. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value);
  433. }
  434. public override void OnNext(TR value)
  435. {
  436. base.observer.OnNext(value);
  437. }
  438. public override void OnError(Exception error)
  439. {
  440. try { observer.OnError(error); }
  441. finally { Dispose(); }
  442. }
  443. public override void OnCompleted()
  444. {
  445. try { observer.OnCompleted(); }
  446. finally { Dispose(); }
  447. }
  448. }
  449. }
  450. internal class CombineLatestObservable<T1, T2, T3, T4, T5, TR> : OperatorObservableBase<TR>
  451. {
  452. IObservable<T1> source1;
  453. IObservable<T2> source2;
  454. IObservable<T3> source3;
  455. IObservable<T4> source4;
  456. IObservable<T5> source5;
  457. CombineLatestFunc<T1, T2, T3, T4, T5, TR> resultSelector;
  458. public CombineLatestObservable(
  459. IObservable<T1> source1,
  460. IObservable<T2> source2,
  461. IObservable<T3> source3,
  462. IObservable<T4> source4,
  463. IObservable<T5> source5,
  464. CombineLatestFunc<T1, T2, T3, T4, T5, TR> resultSelector)
  465. : base(
  466. source1.IsRequiredSubscribeOnCurrentThread() ||
  467. source2.IsRequiredSubscribeOnCurrentThread() ||
  468. source3.IsRequiredSubscribeOnCurrentThread() ||
  469. source4.IsRequiredSubscribeOnCurrentThread() ||
  470. source5.IsRequiredSubscribeOnCurrentThread() ||
  471. false)
  472. {
  473. this.source1 = source1;
  474. this.source2 = source2;
  475. this.source3 = source3;
  476. this.source4 = source4;
  477. this.source5 = source5;
  478. this.resultSelector = resultSelector;
  479. }
  480. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  481. {
  482. return new CombineLatest(5, this, observer, cancel).Run();
  483. }
  484. class CombineLatest : NthCombineLatestObserverBase<TR>
  485. {
  486. readonly CombineLatestObservable<T1, T2, T3, T4, T5, TR> parent;
  487. readonly object gate = new object();
  488. CombineLatestObserver<T1> c1;
  489. CombineLatestObserver<T2> c2;
  490. CombineLatestObserver<T3> c3;
  491. CombineLatestObserver<T4> c4;
  492. CombineLatestObserver<T5> c5;
  493. public CombineLatest(int length, CombineLatestObservable<T1, T2, T3, T4, T5, TR> parent, IObserver<TR> observer, IDisposable cancel)
  494. : base(length, observer, cancel)
  495. {
  496. this.parent = parent;
  497. }
  498. public IDisposable Run()
  499. {
  500. c1 = new CombineLatestObserver<T1>(gate, this, 0);
  501. c2 = new CombineLatestObserver<T2>(gate, this, 1);
  502. c3 = new CombineLatestObserver<T3>(gate, this, 2);
  503. c4 = new CombineLatestObserver<T4>(gate, this, 3);
  504. c5 = new CombineLatestObserver<T5>(gate, this, 4);
  505. var s1 = parent.source1.Subscribe(c1);
  506. var s2 = parent.source2.Subscribe(c2);
  507. var s3 = parent.source3.Subscribe(c3);
  508. var s4 = parent.source4.Subscribe(c4);
  509. var s5 = parent.source5.Subscribe(c5);
  510. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5);
  511. }
  512. public override TR GetResult()
  513. {
  514. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value);
  515. }
  516. public override void OnNext(TR value)
  517. {
  518. base.observer.OnNext(value);
  519. }
  520. public override void OnError(Exception error)
  521. {
  522. try { observer.OnError(error); }
  523. finally { Dispose(); }
  524. }
  525. public override void OnCompleted()
  526. {
  527. try { observer.OnCompleted(); }
  528. finally { Dispose(); }
  529. }
  530. }
  531. }
  532. internal class CombineLatestObservable<T1, T2, T3, T4, T5, T6, TR> : OperatorObservableBase<TR>
  533. {
  534. IObservable<T1> source1;
  535. IObservable<T2> source2;
  536. IObservable<T3> source3;
  537. IObservable<T4> source4;
  538. IObservable<T5> source5;
  539. IObservable<T6> source6;
  540. CombineLatestFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector;
  541. public CombineLatestObservable(
  542. IObservable<T1> source1,
  543. IObservable<T2> source2,
  544. IObservable<T3> source3,
  545. IObservable<T4> source4,
  546. IObservable<T5> source5,
  547. IObservable<T6> source6,
  548. CombineLatestFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector)
  549. : base(
  550. source1.IsRequiredSubscribeOnCurrentThread() ||
  551. source2.IsRequiredSubscribeOnCurrentThread() ||
  552. source3.IsRequiredSubscribeOnCurrentThread() ||
  553. source4.IsRequiredSubscribeOnCurrentThread() ||
  554. source5.IsRequiredSubscribeOnCurrentThread() ||
  555. source6.IsRequiredSubscribeOnCurrentThread() ||
  556. false)
  557. {
  558. this.source1 = source1;
  559. this.source2 = source2;
  560. this.source3 = source3;
  561. this.source4 = source4;
  562. this.source5 = source5;
  563. this.source6 = source6;
  564. this.resultSelector = resultSelector;
  565. }
  566. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  567. {
  568. return new CombineLatest(6, this, observer, cancel).Run();
  569. }
  570. class CombineLatest : NthCombineLatestObserverBase<TR>
  571. {
  572. readonly CombineLatestObservable<T1, T2, T3, T4, T5, T6, TR> parent;
  573. readonly object gate = new object();
  574. CombineLatestObserver<T1> c1;
  575. CombineLatestObserver<T2> c2;
  576. CombineLatestObserver<T3> c3;
  577. CombineLatestObserver<T4> c4;
  578. CombineLatestObserver<T5> c5;
  579. CombineLatestObserver<T6> c6;
  580. public CombineLatest(int length, CombineLatestObservable<T1, T2, T3, T4, T5, T6, TR> parent, IObserver<TR> observer, IDisposable cancel)
  581. : base(length, observer, cancel)
  582. {
  583. this.parent = parent;
  584. }
  585. public IDisposable Run()
  586. {
  587. c1 = new CombineLatestObserver<T1>(gate, this, 0);
  588. c2 = new CombineLatestObserver<T2>(gate, this, 1);
  589. c3 = new CombineLatestObserver<T3>(gate, this, 2);
  590. c4 = new CombineLatestObserver<T4>(gate, this, 3);
  591. c5 = new CombineLatestObserver<T5>(gate, this, 4);
  592. c6 = new CombineLatestObserver<T6>(gate, this, 5);
  593. var s1 = parent.source1.Subscribe(c1);
  594. var s2 = parent.source2.Subscribe(c2);
  595. var s3 = parent.source3.Subscribe(c3);
  596. var s4 = parent.source4.Subscribe(c4);
  597. var s5 = parent.source5.Subscribe(c5);
  598. var s6 = parent.source6.Subscribe(c6);
  599. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6);
  600. }
  601. public override TR GetResult()
  602. {
  603. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value, c6.Value);
  604. }
  605. public override void OnNext(TR value)
  606. {
  607. base.observer.OnNext(value);
  608. }
  609. public override void OnError(Exception error)
  610. {
  611. try { observer.OnError(error); }
  612. finally { Dispose(); }
  613. }
  614. public override void OnCompleted()
  615. {
  616. try { observer.OnCompleted(); }
  617. finally { Dispose(); }
  618. }
  619. }
  620. }
  621. internal class CombineLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> : OperatorObservableBase<TR>
  622. {
  623. IObservable<T1> source1;
  624. IObservable<T2> source2;
  625. IObservable<T3> source3;
  626. IObservable<T4> source4;
  627. IObservable<T5> source5;
  628. IObservable<T6> source6;
  629. IObservable<T7> source7;
  630. CombineLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector;
  631. public CombineLatestObservable(
  632. IObservable<T1> source1,
  633. IObservable<T2> source2,
  634. IObservable<T3> source3,
  635. IObservable<T4> source4,
  636. IObservable<T5> source5,
  637. IObservable<T6> source6,
  638. IObservable<T7> source7,
  639. CombineLatestFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector)
  640. : base(
  641. source1.IsRequiredSubscribeOnCurrentThread() ||
  642. source2.IsRequiredSubscribeOnCurrentThread() ||
  643. source3.IsRequiredSubscribeOnCurrentThread() ||
  644. source4.IsRequiredSubscribeOnCurrentThread() ||
  645. source5.IsRequiredSubscribeOnCurrentThread() ||
  646. source6.IsRequiredSubscribeOnCurrentThread() ||
  647. source7.IsRequiredSubscribeOnCurrentThread() ||
  648. false)
  649. {
  650. this.source1 = source1;
  651. this.source2 = source2;
  652. this.source3 = source3;
  653. this.source4 = source4;
  654. this.source5 = source5;
  655. this.source6 = source6;
  656. this.source7 = source7;
  657. this.resultSelector = resultSelector;
  658. }
  659. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  660. {
  661. return new CombineLatest(7, this, observer, cancel).Run();
  662. }
  663. class CombineLatest : NthCombineLatestObserverBase<TR>
  664. {
  665. readonly CombineLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent;
  666. readonly object gate = new object();
  667. CombineLatestObserver<T1> c1;
  668. CombineLatestObserver<T2> c2;
  669. CombineLatestObserver<T3> c3;
  670. CombineLatestObserver<T4> c4;
  671. CombineLatestObserver<T5> c5;
  672. CombineLatestObserver<T6> c6;
  673. CombineLatestObserver<T7> c7;
  674. public CombineLatest(int length, CombineLatestObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent, IObserver<TR> observer, IDisposable cancel)
  675. : base(length, observer, cancel)
  676. {
  677. this.parent = parent;
  678. }
  679. public IDisposable Run()
  680. {
  681. c1 = new CombineLatestObserver<T1>(gate, this, 0);
  682. c2 = new CombineLatestObserver<T2>(gate, this, 1);
  683. c3 = new CombineLatestObserver<T3>(gate, this, 2);
  684. c4 = new CombineLatestObserver<T4>(gate, this, 3);
  685. c5 = new CombineLatestObserver<T5>(gate, this, 4);
  686. c6 = new CombineLatestObserver<T6>(gate, this, 5);
  687. c7 = new CombineLatestObserver<T7>(gate, this, 6);
  688. var s1 = parent.source1.Subscribe(c1);
  689. var s2 = parent.source2.Subscribe(c2);
  690. var s3 = parent.source3.Subscribe(c3);
  691. var s4 = parent.source4.Subscribe(c4);
  692. var s5 = parent.source5.Subscribe(c5);
  693. var s6 = parent.source6.Subscribe(c6);
  694. var s7 = parent.source7.Subscribe(c7);
  695. return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6, s7);
  696. }
  697. public override TR GetResult()
  698. {
  699. return parent.resultSelector(c1.Value, c2.Value, c3.Value, c4.Value, c5.Value, c6.Value, c7.Value);
  700. }
  701. public override void OnNext(TR value)
  702. {
  703. base.observer.OnNext(value);
  704. }
  705. public override void OnError(Exception error)
  706. {
  707. try { observer.OnError(error); }
  708. finally { Dispose(); }
  709. }
  710. public override void OnCompleted()
  711. {
  712. try { observer.OnCompleted(); }
  713. finally { Dispose(); }
  714. }
  715. }
  716. }
  717. #endregion
  718. // Nth infrastructure
  719. internal interface ICombineLatestObservable
  720. {
  721. void Publish(int index);
  722. void Fail(Exception error);
  723. void Done(int index);
  724. }
  725. internal abstract class NthCombineLatestObserverBase<T> : OperatorObserverBase<T, T>, ICombineLatestObservable
  726. {
  727. readonly int length;
  728. bool isAllValueStarted;
  729. readonly bool[] isStarted;
  730. readonly bool[] isCompleted;
  731. public NthCombineLatestObserverBase(int length, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  732. {
  733. this.length = length;
  734. this.isAllValueStarted = false;
  735. this.isStarted = new bool[length];
  736. this.isCompleted = new bool[length];
  737. }
  738. public abstract T GetResult();
  739. // operators in lock
  740. public void Publish(int index)
  741. {
  742. isStarted[index] = true;
  743. if (isAllValueStarted)
  744. {
  745. var result = default(T);
  746. try
  747. {
  748. result = GetResult();
  749. }
  750. catch (Exception ex)
  751. {
  752. try { observer.OnError(ex); }
  753. finally { Dispose(); }
  754. return;
  755. }
  756. OnNext(result);
  757. return;
  758. }
  759. var allValueStarted = true;
  760. for (int i = 0; i < length; i++)
  761. {
  762. if (!isStarted[i])
  763. {
  764. allValueStarted = false;
  765. break;
  766. }
  767. }
  768. isAllValueStarted = allValueStarted;
  769. if (isAllValueStarted)
  770. {
  771. var result = default(T);
  772. try
  773. {
  774. result = GetResult();
  775. }
  776. catch (Exception ex)
  777. {
  778. try { observer.OnError(ex); }
  779. finally { Dispose(); }
  780. return;
  781. }
  782. OnNext(result);
  783. return;
  784. }
  785. else
  786. {
  787. var allCompletedWithoutSelf = true;
  788. for (int i = 0; i < length; i++)
  789. {
  790. if (i == index) continue;
  791. if (!isCompleted[i])
  792. {
  793. allCompletedWithoutSelf = false;
  794. break;
  795. }
  796. }
  797. if (allCompletedWithoutSelf)
  798. {
  799. try { observer.OnCompleted(); }
  800. finally { Dispose(); }
  801. return;
  802. }
  803. else
  804. {
  805. return;
  806. }
  807. }
  808. }
  809. public void Done(int index)
  810. {
  811. isCompleted[index] = true;
  812. var allTrue = true;
  813. for (int i = 0; i < length; i++)
  814. {
  815. if (!isCompleted[i])
  816. {
  817. allTrue = false;
  818. break;
  819. }
  820. }
  821. if (allTrue)
  822. {
  823. try { observer.OnCompleted(); }
  824. finally { Dispose(); }
  825. }
  826. }
  827. public void Fail(Exception error)
  828. {
  829. try { observer.OnError(error); }
  830. finally { Dispose(); }
  831. }
  832. }
  833. // Nth
  834. internal class CombineLatestObserver<T> : IObserver<T>
  835. {
  836. readonly object gate;
  837. readonly ICombineLatestObservable parent;
  838. readonly int index;
  839. T value;
  840. public T Value { get { return value; } }
  841. public CombineLatestObserver(object gate, ICombineLatestObservable parent, int index)
  842. {
  843. this.gate = gate;
  844. this.parent = parent;
  845. this.index = index;
  846. }
  847. public void OnNext(T value)
  848. {
  849. lock (gate)
  850. {
  851. this.value = value;
  852. parent.Publish(index);
  853. }
  854. }
  855. public void OnError(Exception error)
  856. {
  857. lock (gate)
  858. {
  859. parent.Fail(error);
  860. }
  861. }
  862. public void OnCompleted()
  863. {
  864. lock (gate)
  865. {
  866. parent.Done(index);
  867. }
  868. }
  869. }
  870. }