WhenAll.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class WhenAllObservable<T> : OperatorObservableBase<T[]>
  6. {
  7. readonly IObservable<T>[] sources;
  8. readonly IEnumerable<IObservable<T>> sourcesEnumerable;
  9. public WhenAllObservable(IObservable<T>[] sources)
  10. : base(false)
  11. {
  12. this.sources = sources;
  13. }
  14. public WhenAllObservable(IEnumerable<IObservable<T>> sources)
  15. : base(false)
  16. {
  17. this.sourcesEnumerable = sources;
  18. }
  19. protected override IDisposable SubscribeCore(IObserver<T[]> observer, IDisposable cancel)
  20. {
  21. if (sources != null)
  22. {
  23. return new WhenAll(this.sources, observer, cancel).Run();
  24. }
  25. else
  26. {
  27. var xs = sourcesEnumerable as IList<IObservable<T>>;
  28. if (xs == null)
  29. {
  30. xs = new List<IObservable<T>>(sourcesEnumerable); // materialize observables
  31. }
  32. return new WhenAll_(xs, observer, cancel).Run();
  33. }
  34. }
  35. class WhenAll : OperatorObserverBase<T[], T[]>
  36. {
  37. readonly IObservable<T>[] sources;
  38. readonly object gate = new object();
  39. int completedCount;
  40. int length;
  41. T[] values;
  42. public WhenAll(IObservable<T>[] sources, IObserver<T[]> observer, IDisposable cancel)
  43. : base(observer, cancel)
  44. {
  45. this.sources = sources;
  46. }
  47. public IDisposable Run()
  48. {
  49. length = sources.Length;
  50. // fail safe...
  51. if (length == 0)
  52. {
  53. OnNext(new T[0]);
  54. try { observer.OnCompleted(); } finally { Dispose(); }
  55. return Disposable.Empty;
  56. }
  57. completedCount = 0;
  58. values = new T[length];
  59. var subscriptions = new IDisposable[length];
  60. for (int index = 0; index < length; index++)
  61. {
  62. var source = sources[index];
  63. var observer = new WhenAllCollectionObserver(this, index);
  64. subscriptions[index] = source.Subscribe(observer);
  65. }
  66. return StableCompositeDisposable.CreateUnsafe(subscriptions);
  67. }
  68. public override void OnNext(T[] value)
  69. {
  70. base.observer.OnNext(value);
  71. }
  72. public override void OnError(Exception error)
  73. {
  74. try { observer.OnError(error); } finally { Dispose(); }
  75. }
  76. public override void OnCompleted()
  77. {
  78. try { observer.OnCompleted(); } finally { Dispose(); }
  79. }
  80. class WhenAllCollectionObserver : IObserver<T>
  81. {
  82. readonly WhenAll parent;
  83. readonly int index;
  84. bool isCompleted = false;
  85. public WhenAllCollectionObserver(WhenAll parent, int index)
  86. {
  87. this.parent = parent;
  88. this.index = index;
  89. }
  90. public void OnNext(T value)
  91. {
  92. lock (parent.gate)
  93. {
  94. if (!isCompleted)
  95. {
  96. parent.values[index] = value;
  97. }
  98. }
  99. }
  100. public void OnError(Exception error)
  101. {
  102. lock (parent.gate)
  103. {
  104. if (!isCompleted)
  105. {
  106. parent.OnError(error);
  107. }
  108. }
  109. }
  110. public void OnCompleted()
  111. {
  112. lock (parent.gate)
  113. {
  114. if (!isCompleted)
  115. {
  116. isCompleted = true;
  117. parent.completedCount++;
  118. if (parent.completedCount == parent.length)
  119. {
  120. parent.OnNext(parent.values);
  121. parent.OnCompleted();
  122. }
  123. }
  124. }
  125. }
  126. }
  127. }
  128. class WhenAll_ : OperatorObserverBase<T[], T[]>
  129. {
  130. readonly IList<IObservable<T>> sources;
  131. readonly object gate = new object();
  132. int completedCount;
  133. int length;
  134. T[] values;
  135. public WhenAll_(IList<IObservable<T>> sources, IObserver<T[]> observer, IDisposable cancel)
  136. : base(observer, cancel)
  137. {
  138. this.sources = sources;
  139. }
  140. public IDisposable Run()
  141. {
  142. length = sources.Count;
  143. // fail safe...
  144. if (length == 0)
  145. {
  146. OnNext(new T[0]);
  147. try { observer.OnCompleted(); } finally { Dispose(); }
  148. return Disposable.Empty;
  149. }
  150. completedCount = 0;
  151. values = new T[length];
  152. var subscriptions = new IDisposable[length];
  153. for (int index = 0; index < length; index++)
  154. {
  155. var source = sources[index];
  156. var observer = new WhenAllCollectionObserver(this, index);
  157. subscriptions[index] = source.Subscribe(observer);
  158. }
  159. return StableCompositeDisposable.CreateUnsafe(subscriptions);
  160. }
  161. public override void OnNext(T[] value)
  162. {
  163. base.observer.OnNext(value);
  164. }
  165. public override void OnError(Exception error)
  166. {
  167. try { observer.OnError(error); } finally { Dispose(); }
  168. }
  169. public override void OnCompleted()
  170. {
  171. try { observer.OnCompleted(); } finally { Dispose(); }
  172. }
  173. class WhenAllCollectionObserver : IObserver<T>
  174. {
  175. readonly WhenAll_ parent;
  176. readonly int index;
  177. bool isCompleted = false;
  178. public WhenAllCollectionObserver(WhenAll_ parent, int index)
  179. {
  180. this.parent = parent;
  181. this.index = index;
  182. }
  183. public void OnNext(T value)
  184. {
  185. lock (parent.gate)
  186. {
  187. if (!isCompleted)
  188. {
  189. parent.values[index] = value;
  190. }
  191. }
  192. }
  193. public void OnError(Exception error)
  194. {
  195. lock (parent.gate)
  196. {
  197. if (!isCompleted)
  198. {
  199. parent.OnError(error);
  200. }
  201. }
  202. }
  203. public void OnCompleted()
  204. {
  205. lock (parent.gate)
  206. {
  207. if (!isCompleted)
  208. {
  209. isCompleted = true;
  210. parent.completedCount++;
  211. if (parent.completedCount == parent.length)
  212. {
  213. parent.OnNext(parent.values);
  214. parent.OnCompleted();
  215. }
  216. }
  217. }
  218. }
  219. }
  220. }
  221. }
  222. internal class WhenAllObservable : OperatorObservableBase<Unit>
  223. {
  224. readonly IObservable<Unit>[] sources;
  225. readonly IEnumerable<IObservable<Unit>> sourcesEnumerable;
  226. public WhenAllObservable(IObservable<Unit>[] sources)
  227. : base(false)
  228. {
  229. this.sources = sources;
  230. }
  231. public WhenAllObservable(IEnumerable<IObservable<Unit>> sources)
  232. : base(false)
  233. {
  234. this.sourcesEnumerable = sources;
  235. }
  236. protected override IDisposable SubscribeCore(IObserver<Unit> observer, IDisposable cancel)
  237. {
  238. if (sources != null)
  239. {
  240. return new WhenAll(this.sources, observer, cancel).Run();
  241. }
  242. else
  243. {
  244. var xs = sourcesEnumerable as IList<IObservable<Unit>>;
  245. if (xs == null)
  246. {
  247. xs = new List<IObservable<Unit>>(sourcesEnumerable); // materialize observables
  248. }
  249. return new WhenAll_(xs, observer, cancel).Run();
  250. }
  251. }
  252. class WhenAll : OperatorObserverBase<Unit, Unit>
  253. {
  254. readonly IObservable<Unit>[] sources;
  255. readonly object gate = new object();
  256. int completedCount;
  257. int length;
  258. public WhenAll(IObservable<Unit>[] sources, IObserver<Unit> observer, IDisposable cancel)
  259. : base(observer, cancel)
  260. {
  261. this.sources = sources;
  262. }
  263. public IDisposable Run()
  264. {
  265. length = sources.Length;
  266. // fail safe...
  267. if (length == 0)
  268. {
  269. OnNext(Unit.Default);
  270. try { observer.OnCompleted(); } finally { Dispose(); }
  271. return Disposable.Empty;
  272. }
  273. completedCount = 0;
  274. var subscriptions = new IDisposable[length];
  275. for (int index = 0; index < sources.Length; index++)
  276. {
  277. var source = sources[index];
  278. var observer = new WhenAllCollectionObserver(this);
  279. subscriptions[index] = source.Subscribe(observer);
  280. }
  281. return StableCompositeDisposable.CreateUnsafe(subscriptions);
  282. }
  283. public override void OnNext(Unit value)
  284. {
  285. base.observer.OnNext(value);
  286. }
  287. public override void OnError(Exception error)
  288. {
  289. try { observer.OnError(error); } finally { Dispose(); }
  290. }
  291. public override void OnCompleted()
  292. {
  293. try { observer.OnCompleted(); } finally { Dispose(); }
  294. }
  295. class WhenAllCollectionObserver : IObserver<Unit>
  296. {
  297. readonly WhenAll parent;
  298. bool isCompleted = false;
  299. public WhenAllCollectionObserver(WhenAll parent)
  300. {
  301. this.parent = parent;
  302. }
  303. public void OnNext(Unit value)
  304. {
  305. }
  306. public void OnError(Exception error)
  307. {
  308. lock (parent.gate)
  309. {
  310. if (!isCompleted)
  311. {
  312. parent.OnError(error);
  313. }
  314. }
  315. }
  316. public void OnCompleted()
  317. {
  318. lock (parent.gate)
  319. {
  320. if (!isCompleted)
  321. {
  322. isCompleted = true;
  323. parent.completedCount++;
  324. if (parent.completedCount == parent.length)
  325. {
  326. parent.OnNext(Unit.Default);
  327. parent.OnCompleted();
  328. }
  329. }
  330. }
  331. }
  332. }
  333. }
  334. class WhenAll_ : OperatorObserverBase<Unit, Unit>
  335. {
  336. readonly IList<IObservable<Unit>> sources;
  337. readonly object gate = new object();
  338. int completedCount;
  339. int length;
  340. public WhenAll_(IList<IObservable<Unit>> sources, IObserver<Unit> observer, IDisposable cancel)
  341. : base(observer, cancel)
  342. {
  343. this.sources = sources;
  344. }
  345. public IDisposable Run()
  346. {
  347. length = sources.Count;
  348. // fail safe...
  349. if (length == 0)
  350. {
  351. OnNext(Unit.Default);
  352. try { observer.OnCompleted(); } finally { Dispose(); }
  353. return Disposable.Empty;
  354. }
  355. completedCount = 0;
  356. var subscriptions = new IDisposable[length];
  357. for (int index = 0; index < length; index++)
  358. {
  359. var source = sources[index];
  360. var observer = new WhenAllCollectionObserver(this);
  361. subscriptions[index] = source.Subscribe(observer);
  362. }
  363. return StableCompositeDisposable.CreateUnsafe(subscriptions);
  364. }
  365. public override void OnNext(Unit value)
  366. {
  367. base.observer.OnNext(value);
  368. }
  369. public override void OnError(Exception error)
  370. {
  371. try { observer.OnError(error); } finally { Dispose(); }
  372. }
  373. public override void OnCompleted()
  374. {
  375. try { observer.OnCompleted(); } finally { Dispose(); }
  376. }
  377. class WhenAllCollectionObserver : IObserver<Unit>
  378. {
  379. readonly WhenAll_ parent;
  380. bool isCompleted = false;
  381. public WhenAllCollectionObserver(WhenAll_ parent)
  382. {
  383. this.parent = parent;
  384. }
  385. public void OnNext(Unit value)
  386. {
  387. }
  388. public void OnError(Exception error)
  389. {
  390. lock (parent.gate)
  391. {
  392. if (!isCompleted)
  393. {
  394. parent.OnError(error);
  395. }
  396. }
  397. }
  398. public void OnCompleted()
  399. {
  400. lock (parent.gate)
  401. {
  402. if (!isCompleted)
  403. {
  404. isCompleted = true;
  405. parent.completedCount++;
  406. if (parent.completedCount == parent.length)
  407. {
  408. parent.OnNext(Unit.Default);
  409. parent.OnCompleted();
  410. }
  411. }
  412. }
  413. }
  414. }
  415. }
  416. }
  417. }