1
0

Observable.Unity.cs 44 KB


  1. #if !(UNITY_4_0 || UNITY_4_1 || UNITY_4_2 || UNITY_4_3 || UNITY_4_4 || UNITY_4_5 || UNITY_4_6 || UNITY_5_0 || UNITY_5_1 || UNITY_5_2)
  2. #define SupportCustomYieldInstruction
  3. #endif
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using UniRx.InternalUtil;
  8. using UniRx.Triggers;
  9. using UnityEngine;
  10. using System.Threading;
  11. #if !UniRxLibrary
  12. using SchedulerUnity = UniRx.Scheduler;
  13. #endif
  14. namespace UniRx
  15. {
  16. public enum FrameCountType
  17. {
  18. Update,
  19. FixedUpdate,
  20. EndOfFrame,
  21. }
  22. public enum MainThreadDispatchType
  23. {
  24. /// <summary>yield return null</summary>
  25. Update,
  26. FixedUpdate,
  27. EndOfFrame,
  28. GameObjectUpdate,
  29. LateUpdate,
  30. }
  31. public static class FrameCountTypeExtensions
  32. {
  33. public static YieldInstruction GetYieldInstruction(this FrameCountType frameCountType)
  34. {
  35. switch (frameCountType)
  36. {
  37. case FrameCountType.FixedUpdate:
  38. return YieldInstructionCache.WaitForFixedUpdate;
  39. case FrameCountType.EndOfFrame:
  40. return YieldInstructionCache.WaitForEndOfFrame;
  41. case FrameCountType.Update:
  42. default:
  43. return null;
  44. }
  45. }
  46. }
  47. internal interface ICustomYieldInstructionErrorHandler
  48. {
  49. bool HasError { get; }
  50. Exception Error { get; }
  51. bool IsReThrowOnError { get; }
  52. void ForceDisableRethrowOnError();
  53. void ForceEnableRethrowOnError();
  54. }
  55. public class ObservableYieldInstruction<T> : IEnumerator<T>, ICustomYieldInstructionErrorHandler
  56. {
  57. readonly IDisposable subscription;
  58. readonly CancellationToken cancel;
  59. bool reThrowOnError;
  60. T current;
  61. T result;
  62. bool moveNext;
  63. bool hasResult;
  64. Exception error;
  65. public ObservableYieldInstruction(IObservable<T> source, bool reThrowOnError, CancellationToken cancel)
  66. {
  67. this.moveNext = true;
  68. this.reThrowOnError = reThrowOnError;
  69. this.cancel = cancel;
  70. try
  71. {
  72. this.subscription = source.Subscribe(new ToYieldInstruction(this));
  73. }
  74. catch
  75. {
  76. moveNext = false;
  77. throw;
  78. }
  79. }
  80. public bool HasError
  81. {
  82. get { return error != null; }
  83. }
  84. public bool HasResult
  85. {
  86. get { return hasResult; }
  87. }
  88. public bool IsCanceled
  89. {
  90. get
  91. {
  92. if (hasResult) return false;
  93. if (error != null) return false;
  94. return cancel.IsCancellationRequested;
  95. }
  96. }
  97. /// <summary>
  98. /// HasResult || IsCanceled || HasError
  99. /// </summary>
  100. public bool IsDone
  101. {
  102. get
  103. {
  104. return HasResult || HasError || (cancel.IsCancellationRequested);
  105. }
  106. }
  107. public T Result
  108. {
  109. get { return result; }
  110. }
  111. T IEnumerator<T>.Current
  112. {
  113. get
  114. {
  115. return current;
  116. }
  117. }
  118. object IEnumerator.Current
  119. {
  120. get
  121. {
  122. return current;
  123. }
  124. }
  125. public Exception Error
  126. {
  127. get
  128. {
  129. return error;
  130. }
  131. }
  132. bool IEnumerator.MoveNext()
  133. {
  134. if (!moveNext)
  135. {
  136. if (reThrowOnError && HasError)
  137. {
  138. Error.Throw();
  139. }
  140. return false;
  141. }
  142. if (cancel.IsCancellationRequested)
  143. {
  144. subscription.Dispose();
  145. return false;
  146. }
  147. return true;
  148. }
  149. bool ICustomYieldInstructionErrorHandler.IsReThrowOnError
  150. {
  151. get { return reThrowOnError; }
  152. }
  153. void ICustomYieldInstructionErrorHandler.ForceDisableRethrowOnError()
  154. {
  155. this.reThrowOnError = false;
  156. }
  157. void ICustomYieldInstructionErrorHandler.ForceEnableRethrowOnError()
  158. {
  159. this.reThrowOnError = true;
  160. }
  161. public void Dispose()
  162. {
  163. subscription.Dispose();
  164. }
  165. void IEnumerator.Reset()
  166. {
  167. throw new NotSupportedException();
  168. }
  169. class ToYieldInstruction : IObserver<T>
  170. {
  171. readonly ObservableYieldInstruction<T> parent;
  172. public ToYieldInstruction(ObservableYieldInstruction<T> parent)
  173. {
  174. this.parent = parent;
  175. }
  176. public void OnNext(T value)
  177. {
  178. parent.current = value;
  179. }
  180. public void OnError(Exception error)
  181. {
  182. parent.moveNext = false;
  183. parent.error = error;
  184. }
  185. public void OnCompleted()
  186. {
  187. parent.moveNext = false;
  188. parent.hasResult = true;
  189. parent.result = parent.current;
  190. }
  191. }
  192. }
  193. #if UniRxLibrary
  194. public static partial class ObservableUnity
  195. #else
  196. public static partial class Observable
  197. #endif
  198. {
  199. readonly static HashSet<Type> YieldInstructionTypes = new HashSet<Type>
  200. {
  201. #if UNITY_2018_3_OR_NEWER
  202. #pragma warning disable CS0618
  203. #endif
  204. typeof(WWW),
  205. #if UNITY_2018_3_OR_NEWER
  206. #pragma warning restore CS0618
  207. #endif
  208. typeof(WaitForEndOfFrame),
  209. typeof(WaitForFixedUpdate),
  210. typeof(WaitForSeconds),
  211. typeof(AsyncOperation),
  212. typeof(Coroutine)
  213. };
  214. #if SupportCustomYieldInstruction
  215. class EveryAfterUpdateInvoker : IEnumerator
  216. {
  217. long count = -1;
  218. readonly IObserver<long> observer;
  219. readonly CancellationToken cancellationToken;
  220. public EveryAfterUpdateInvoker(IObserver<long> observer, CancellationToken cancellationToken)
  221. {
  222. this.observer = observer;
  223. this.cancellationToken = cancellationToken;
  224. }
  225. public bool MoveNext()
  226. {
  227. if (!cancellationToken.IsCancellationRequested)
  228. {
  229. if (count != -1) // ignore first/immediate invoke
  230. {
  231. observer.OnNext(count++);
  232. }
  233. else
  234. {
  235. count++;
  236. }
  237. return true;
  238. }
  239. else
  240. {
  241. return false;
  242. }
  243. }
  244. public object Current
  245. {
  246. get
  247. {
  248. return null;
  249. }
  250. }
  251. public void Reset()
  252. {
  253. throw new NotSupportedException();
  254. }
  255. }
  256. #endif
  257. /// <summary>From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.</summary>
  258. public static IObservable<Unit> FromCoroutine(Func<IEnumerator> coroutine, bool publishEveryYield = false)
  259. {
  260. return FromCoroutine<Unit>((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield));
  261. }
  262. /// <summary>From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.</summary>
  263. public static IObservable<Unit> FromCoroutine(Func<CancellationToken, IEnumerator> coroutine, bool publishEveryYield = false)
  264. {
  265. return FromCoroutine<Unit>((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield));
  266. }
  267. /// <summary>
  268. /// MicroCoroutine is lightweight, fast coroutine dispatcher.
  269. /// IEnumerator supports only yield return null.
  270. /// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
  271. /// </summary>
  272. public static IObservable<Unit> FromMicroCoroutine(Func<IEnumerator> coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update)
  273. {
  274. return FromMicroCoroutine<Unit>((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield), frameCountType);
  275. }
  276. /// <summary>
  277. /// MicroCoroutine is lightweight, fast coroutine dispatcher.
  278. /// IEnumerator supports only yield return null.
  279. /// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
  280. /// </summary>
  281. public static IObservable<Unit> FromMicroCoroutine(Func<CancellationToken, IEnumerator> coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update)
  282. {
  283. return FromMicroCoroutine<Unit>((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield), frameCountType);
  284. }
  285. static IEnumerator WrapEnumerator(IEnumerator enumerator, IObserver<Unit> observer, CancellationToken cancellationToken, bool publishEveryYield)
  286. {
  287. var hasNext = default(bool);
  288. var raisedError = false;
  289. do
  290. {
  291. try
  292. {
  293. hasNext = enumerator.MoveNext();
  294. }
  295. catch (Exception ex)
  296. {
  297. try
  298. {
  299. raisedError = true;
  300. observer.OnError(ex);
  301. }
  302. finally
  303. {
  304. var d = enumerator as IDisposable;
  305. if (d != null)
  306. {
  307. d.Dispose();
  308. }
  309. }
  310. yield break;
  311. }
  312. if (hasNext && publishEveryYield)
  313. {
  314. try
  315. {
  316. observer.OnNext(Unit.Default);
  317. }
  318. catch
  319. {
  320. var d = enumerator as IDisposable;
  321. if (d != null)
  322. {
  323. d.Dispose();
  324. }
  325. throw;
  326. }
  327. }
  328. if (hasNext)
  329. {
  330. #if SupportCustomYieldInstruction
  331. var current = enumerator.Current;
  332. var customHandler = current as ICustomYieldInstructionErrorHandler;
  333. if (customHandler != null && customHandler.IsReThrowOnError)
  334. {
  335. // If throws exception in Custom YieldInsrtuction, can't handle parent coroutine.
  336. // It is C# limitation.
  337. // so store error info and retrieve from parent.
  338. customHandler.ForceDisableRethrowOnError();
  339. yield return current;
  340. customHandler.ForceEnableRethrowOnError();
  341. if (customHandler.HasError)
  342. {
  343. try
  344. {
  345. raisedError = true;
  346. observer.OnError(customHandler.Error);
  347. }
  348. finally
  349. {
  350. var d = enumerator as IDisposable;
  351. if (d != null)
  352. {
  353. d.Dispose();
  354. }
  355. }
  356. yield break;
  357. }
  358. }
  359. else
  360. {
  361. yield return enumerator.Current; // yield inner YieldInstruction
  362. }
  363. #else
  364. yield return enumerator.Current; // yield inner YieldInstruction
  365. #endif
  366. }
  367. } while (hasNext && !cancellationToken.IsCancellationRequested);
  368. try
  369. {
  370. if (!raisedError && !cancellationToken.IsCancellationRequested)
  371. {
  372. observer.OnNext(Unit.Default); // last one
  373. observer.OnCompleted();
  374. }
  375. }
  376. finally
  377. {
  378. var d = enumerator as IDisposable;
  379. if (d != null)
  380. {
  381. d.Dispose();
  382. }
  383. }
  384. }
  385. /// <summary>Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext.</summary>
  386. public static IObservable<T> FromCoroutineValue<T>(Func<IEnumerator> coroutine, bool nullAsNextUpdate = true)
  387. {
  388. return FromCoroutine<T>((observer, cancellationToken) => WrapEnumeratorYieldValue<T>(coroutine(), observer, cancellationToken, nullAsNextUpdate));
  389. }
  390. /// <summary>Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext.</summary>
  391. public static IObservable<T> FromCoroutineValue<T>(Func<CancellationToken, IEnumerator> coroutine, bool nullAsNextUpdate = true)
  392. {
  393. return FromCoroutine<T>((observer, cancellationToken) => WrapEnumeratorYieldValue<T>(coroutine(cancellationToken), observer, cancellationToken, nullAsNextUpdate));
  394. }
  395. static IEnumerator WrapEnumeratorYieldValue<T>(IEnumerator enumerator, IObserver<T> observer, CancellationToken cancellationToken, bool nullAsNextUpdate)
  396. {
  397. var hasNext = default(bool);
  398. var current = default(object);
  399. var raisedError = false;
  400. do
  401. {
  402. try
  403. {
  404. hasNext = enumerator.MoveNext();
  405. if (hasNext) current = enumerator.Current;
  406. }
  407. catch (Exception ex)
  408. {
  409. try
  410. {
  411. raisedError = true;
  412. observer.OnError(ex);
  413. }
  414. finally
  415. {
  416. var d = enumerator as IDisposable;
  417. if (d != null)
  418. {
  419. d.Dispose();
  420. }
  421. }
  422. yield break;
  423. }
  424. if (hasNext)
  425. {
  426. if (current != null && YieldInstructionTypes.Contains(current.GetType()))
  427. {
  428. yield return current;
  429. }
  430. #if SupportCustomYieldInstruction
  431. else if (current is IEnumerator)
  432. {
  433. var customHandler = current as ICustomYieldInstructionErrorHandler;
  434. if (customHandler != null && customHandler.IsReThrowOnError)
  435. {
  436. // If throws exception in Custom YieldInsrtuction, can't handle parent coroutine.
  437. // It is C# limitation.
  438. // so store error info and retrieve from parent.
  439. customHandler.ForceDisableRethrowOnError();
  440. yield return current;
  441. customHandler.ForceEnableRethrowOnError();
  442. if (customHandler.HasError)
  443. {
  444. try
  445. {
  446. raisedError = true;
  447. observer.OnError(customHandler.Error);
  448. }
  449. finally
  450. {
  451. var d = enumerator as IDisposable;
  452. if (d != null)
  453. {
  454. d.Dispose();
  455. }
  456. }
  457. yield break;
  458. }
  459. }
  460. else
  461. {
  462. yield return current;
  463. }
  464. }
  465. #endif
  466. else if (current == null && nullAsNextUpdate)
  467. {
  468. yield return null;
  469. }
  470. else
  471. {
  472. try
  473. {
  474. observer.OnNext((T)current);
  475. }
  476. catch
  477. {
  478. var d = enumerator as IDisposable;
  479. if (d != null)
  480. {
  481. d.Dispose();
  482. }
  483. throw;
  484. }
  485. }
  486. }
  487. } while (hasNext && !cancellationToken.IsCancellationRequested);
  488. try
  489. {
  490. if (!raisedError && !cancellationToken.IsCancellationRequested)
  491. {
  492. observer.OnCompleted();
  493. }
  494. }
  495. finally
  496. {
  497. var d = enumerator as IDisposable;
  498. if (d != null)
  499. {
  500. d.Dispose();
  501. }
  502. }
  503. }
  504. public static IObservable<T> FromCoroutine<T>(Func<IObserver<T>, IEnumerator> coroutine)
  505. {
  506. return FromCoroutine<T>((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken));
  507. }
  508. /// <summary>
  509. /// MicroCoroutine is lightweight, fast coroutine dispatcher.
  510. /// IEnumerator supports only yield return null.
  511. /// </summary>
  512. public static IObservable<T> FromMicroCoroutine<T>(Func<IObserver<T>, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update)
  513. {
  514. return FromMicroCoroutine<T>((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken), frameCountType);
  515. }
  516. static IEnumerator WrapToCancellableEnumerator<T>(IEnumerator enumerator, IObserver<T> observer, CancellationToken cancellationToken)
  517. {
  518. var hasNext = default(bool);
  519. do
  520. {
  521. try
  522. {
  523. hasNext = enumerator.MoveNext();
  524. }
  525. catch (Exception ex)
  526. {
  527. try
  528. {
  529. observer.OnError(ex);
  530. }
  531. finally
  532. {
  533. var d = enumerator as IDisposable;
  534. if (d != null)
  535. {
  536. d.Dispose();
  537. }
  538. }
  539. yield break;
  540. }
  541. yield return enumerator.Current; // yield inner YieldInstruction
  542. } while (hasNext && !cancellationToken.IsCancellationRequested);
  543. {
  544. var d = enumerator as IDisposable;
  545. if (d != null)
  546. {
  547. d.Dispose();
  548. }
  549. }
  550. }
  551. public static IObservable<T> FromCoroutine<T>(Func<IObserver<T>, CancellationToken, IEnumerator> coroutine)
  552. {
  553. return new UniRx.Operators.FromCoroutineObservable<T>(coroutine);
  554. }
  555. /// <summary>
  556. /// MicroCoroutine is lightweight, fast coroutine dispatcher.
  557. /// IEnumerator supports only yield return null.
  558. /// </summary>
  559. public static IObservable<T> FromMicroCoroutine<T>(Func<IObserver<T>, CancellationToken, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update)
  560. {
  561. return new UniRx.Operators.FromMicroCoroutineObservable<T>(coroutine, frameCountType);
  562. }
  563. public static IObservable<Unit> SelectMany<T>(this IObservable<T> source, IEnumerator coroutine, bool publishEveryYield = false)
  564. {
  565. return source.SelectMany(FromCoroutine(() => coroutine, publishEveryYield));
  566. }
  567. public static IObservable<Unit> SelectMany<T>(this IObservable<T> source, Func<IEnumerator> selector, bool publishEveryYield = false)
  568. {
  569. return source.SelectMany(FromCoroutine(() => selector(), publishEveryYield));
  570. }
  571. /// <summary>
  572. /// Note: publishEveryYield is always false. If you want to set true, use Observable.FromCoroutine(() => selector(x), true). This is workaround of Unity compiler's bug.
  573. /// </summary>
  574. public static IObservable<Unit> SelectMany<T>(this IObservable<T> source, Func<T, IEnumerator> selector)
  575. {
  576. return source.SelectMany(x => FromCoroutine(() => selector(x), false));
  577. }
  578. public static IObservable<Unit> ToObservable(this IEnumerator coroutine, bool publishEveryYield = false)
  579. {
  580. return FromCoroutine<Unit>((observer, cancellationToken) => WrapEnumerator(coroutine, observer, cancellationToken, publishEveryYield));
  581. }
  582. #if SupportCustomYieldInstruction
  583. public static ObservableYieldInstruction<Unit> ToYieldInstruction(this IEnumerator coroutine)
  584. {
  585. return ToObservable(coroutine, false).ToYieldInstruction();
  586. }
  587. public static ObservableYieldInstruction<Unit> ToYieldInstruction(this IEnumerator coroutine, bool throwOnError)
  588. {
  589. return ToObservable(coroutine, false).ToYieldInstruction(throwOnError);
  590. }
  591. public static ObservableYieldInstruction<Unit> ToYieldInstruction(this IEnumerator coroutine, CancellationToken cancellationToken)
  592. {
  593. return ToObservable(coroutine, false).ToYieldInstruction(cancellationToken);
  594. }
  595. public static ObservableYieldInstruction<Unit> ToYieldInstruction(this IEnumerator coroutine, bool throwOnError, CancellationToken cancellationToken)
  596. {
  597. return ToObservable(coroutine, false).ToYieldInstruction(throwOnError, cancellationToken);
  598. }
  599. #endif
  600. // variation of FromCoroutine
  601. /// <summary>
  602. /// EveryUpdate calls coroutine's yield return null timing. It is after all Update and before LateUpdate.
  603. /// </summary>
  604. public static IObservable<long> EveryUpdate()
  605. {
  606. return FromMicroCoroutine<long>((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.Update);
  607. }
  608. public static IObservable<long> EveryFixedUpdate()
  609. {
  610. return FromMicroCoroutine<long>((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.FixedUpdate);
  611. }
  612. public static IObservable<long> EveryEndOfFrame()
  613. {
  614. return FromMicroCoroutine<long>((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.EndOfFrame);
  615. }
  616. static IEnumerator EveryCycleCore(IObserver<long> observer, CancellationToken cancellationToken)
  617. {
  618. if (cancellationToken.IsCancellationRequested) yield break;
  619. var count = 0L;
  620. while (true)
  621. {
  622. yield return null;
  623. if (cancellationToken.IsCancellationRequested) yield break;
  624. observer.OnNext(count++);
  625. }
  626. }
  627. /// <summary>
  628. /// EveryGameObjectUpdate calls from MainThreadDispatcher's Update.
  629. /// </summary>
  630. public static IObservable<long> EveryGameObjectUpdate()
  631. {
  632. return MainThreadDispatcher.UpdateAsObservable().Scan(-1L, (x, y) => x + 1);
  633. }
  634. /// <summary>
  635. /// EveryLateUpdate calls from MainThreadDispatcher's OnLateUpdate.
  636. /// </summary>
  637. public static IObservable<long> EveryLateUpdate()
  638. {
  639. return MainThreadDispatcher.LateUpdateAsObservable().Scan(-1L, (x, y) => x + 1);
  640. }
  641. #if SupportCustomYieldInstruction
  642. /// <summary>
  643. /// [Obsolete]Same as EveryUpdate.
  644. /// </summary>
  645. [Obsolete]
  646. public static IObservable<long> EveryAfterUpdate()
  647. {
  648. return FromCoroutine<long>((observer, cancellationToken) => new EveryAfterUpdateInvoker(observer, cancellationToken));
  649. }
  650. #endif
  651. #region Observable.Time Frame Extensions
  652. // Interval, Timer, Delay, Sample, Throttle, Timeout
  653. public static IObservable<Unit> NextFrame(FrameCountType frameCountType = FrameCountType.Update)
  654. {
  655. return FromMicroCoroutine<Unit>((observer, cancellation) => NextFrameCore(observer, cancellation), frameCountType);
  656. }
  657. static IEnumerator NextFrameCore(IObserver<Unit> observer, CancellationToken cancellation)
  658. {
  659. yield return null;
  660. if (!cancellation.IsCancellationRequested)
  661. {
  662. observer.OnNext(Unit.Default);
  663. observer.OnCompleted();
  664. }
  665. }
  666. public static IObservable<long> IntervalFrame(int intervalFrameCount, FrameCountType frameCountType = FrameCountType.Update)
  667. {
  668. return TimerFrame(intervalFrameCount, intervalFrameCount, frameCountType);
  669. }
  670. public static IObservable<long> TimerFrame(int dueTimeFrameCount, FrameCountType frameCountType = FrameCountType.Update)
  671. {
  672. return FromMicroCoroutine<long>((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, cancellation), frameCountType);
  673. }
  674. public static IObservable<long> TimerFrame(int dueTimeFrameCount, int periodFrameCount, FrameCountType frameCountType = FrameCountType.Update)
  675. {
  676. return FromMicroCoroutine<long>((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, periodFrameCount, cancellation), frameCountType);
  677. }
  678. static IEnumerator TimerFrameCore(IObserver<long> observer, int dueTimeFrameCount, CancellationToken cancel)
  679. {
  680. // normalize
  681. if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0;
  682. var currentFrame = 0;
  683. // initial phase
  684. while (!cancel.IsCancellationRequested)
  685. {
  686. if (currentFrame++ == dueTimeFrameCount)
  687. {
  688. observer.OnNext(0);
  689. observer.OnCompleted();
  690. break;
  691. }
  692. yield return null;
  693. }
  694. }
  695. static IEnumerator TimerFrameCore(IObserver<long> observer, int dueTimeFrameCount, int periodFrameCount, CancellationToken cancel)
  696. {
  697. // normalize
  698. if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0;
  699. if (periodFrameCount <= 0) periodFrameCount = 1;
  700. var sendCount = 0L;
  701. var currentFrame = 0;
  702. // initial phase
  703. while (!cancel.IsCancellationRequested)
  704. {
  705. if (currentFrame++ == dueTimeFrameCount)
  706. {
  707. observer.OnNext(sendCount++);
  708. currentFrame = -1;
  709. break;
  710. }
  711. yield return null;
  712. }
  713. // period phase
  714. while (!cancel.IsCancellationRequested)
  715. {
  716. if (++currentFrame == periodFrameCount)
  717. {
  718. observer.OnNext(sendCount++);
  719. currentFrame = 0;
  720. }
  721. yield return null;
  722. }
  723. }
  724. public static IObservable<T> DelayFrame<T>(this IObservable<T> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  725. {
  726. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  727. return new UniRx.Operators.DelayFrameObservable<T>(source, frameCount, frameCountType);
  728. }
  729. public static IObservable<T> Sample<T, T2>(this IObservable<T> source, IObservable<T2> sampler)
  730. {
  731. return new UniRx.Operators.SampleObservable<T, T2>(source, sampler);
  732. }
  733. public static IObservable<T> SampleFrame<T>(this IObservable<T> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  734. {
  735. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  736. return new UniRx.Operators.SampleFrameObservable<T>(source, frameCount, frameCountType);
  737. }
  738. public static IObservable<TSource> ThrottleFrame<TSource>(this IObservable<TSource> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  739. {
  740. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  741. return new UniRx.Operators.ThrottleFrameObservable<TSource>(source, frameCount, frameCountType);
  742. }
  743. public static IObservable<TSource> ThrottleFirstFrame<TSource>(this IObservable<TSource> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  744. {
  745. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  746. return new UniRx.Operators.ThrottleFirstFrameObservable<TSource>(source, frameCount, frameCountType);
  747. }
  748. public static IObservable<T> TimeoutFrame<T>(this IObservable<T> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  749. {
  750. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  751. return new UniRx.Operators.TimeoutFrameObservable<T>(source, frameCount, frameCountType);
  752. }
  753. public static IObservable<T> DelayFrameSubscription<T>(this IObservable<T> source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
  754. {
  755. if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
  756. return new UniRx.Operators.DelayFrameSubscriptionObservable<T>(source, frameCount, frameCountType);
  757. }
  758. #endregion
  759. #if SupportCustomYieldInstruction
  760. /// <summary>
  761. /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
  762. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
  763. /// This overload throws exception if received OnError events(same as coroutine).
  764. /// </summary>
  765. public static ObservableYieldInstruction<T> ToYieldInstruction<T>(this IObservable<T> source)
  766. {
  767. return new ObservableYieldInstruction<T>(source, true, CancellationToken.None);
  768. }
  769. /// <summary>
  770. /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
  771. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
  772. /// This overload throws exception if received OnError events(same as coroutine).
  773. /// </summary>
  774. public static ObservableYieldInstruction<T> ToYieldInstruction<T>(this IObservable<T> source, CancellationToken cancel)
  775. {
  776. return new ObservableYieldInstruction<T>(source, true, cancel);
  777. }
  778. /// <summary>
  779. /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
  780. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
  781. /// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property.
  782. /// </summary>
  783. public static ObservableYieldInstruction<T> ToYieldInstruction<T>(this IObservable<T> source, bool throwOnError)
  784. {
  785. return new ObservableYieldInstruction<T>(source, throwOnError, CancellationToken.None);
  786. }
  787. /// <summary>
  788. /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
  789. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
  790. /// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property.
  791. /// </summary>
  792. public static ObservableYieldInstruction<T> ToYieldInstruction<T>(this IObservable<T> source, bool throwOnError, CancellationToken cancel)
  793. {
  794. return new ObservableYieldInstruction<T>(source, throwOnError, cancel);
  795. }
  796. #endif
  797. /// <summary>Convert to awaitable IEnumerator.</summary>
  798. public static IEnumerator ToAwaitableEnumerator<T>(this IObservable<T> source, CancellationToken cancel = default(CancellationToken))
  799. {
  800. return ToAwaitableEnumerator<T>(source, Stubs<T>.Ignore, Stubs.Throw, cancel);
  801. }
  802. /// <summary>Convert to awaitable IEnumerator.</summary>
  803. public static IEnumerator ToAwaitableEnumerator<T>(this IObservable<T> source, Action<T> onResult, CancellationToken cancel = default(CancellationToken))
  804. {
  805. return ToAwaitableEnumerator<T>(source, onResult, Stubs.Throw, cancel);
  806. }
  807. /// <summary>Convert to awaitable IEnumerator.</summary>
  808. public static IEnumerator ToAwaitableEnumerator<T>(this IObservable<T> source, Action<Exception> onError, CancellationToken cancel = default(CancellationToken))
  809. {
  810. return ToAwaitableEnumerator<T>(source, Stubs<T>.Ignore, onError, cancel);
  811. }
  812. /// <summary>Convert to awaitable IEnumerator.</summary>
  813. public static IEnumerator ToAwaitableEnumerator<T>(this IObservable<T> source, Action<T> onResult, Action<Exception> onError, CancellationToken cancel = default(CancellationToken))
  814. {
  815. var enumerator = new ObservableYieldInstruction<T>(source, false, cancel);
  816. var e = (IEnumerator<T>)enumerator;
  817. while (e.MoveNext() && !cancel.IsCancellationRequested)
  818. {
  819. yield return null;
  820. }
  821. if (cancel.IsCancellationRequested)
  822. {
  823. enumerator.Dispose();
  824. yield break;
  825. }
  826. if (enumerator.HasResult)
  827. {
  828. onResult(enumerator.Result);
  829. }
  830. else if (enumerator.HasError)
  831. {
  832. onError(enumerator.Error);
  833. }
  834. }
  835. /// <summary>AutoStart observable as coroutine.</summary>
  836. public static Coroutine StartAsCoroutine<T>(this IObservable<T> source, CancellationToken cancel = default(CancellationToken))
  837. {
  838. return StartAsCoroutine<T>(source, Stubs<T>.Ignore, Stubs.Throw, cancel);
  839. }
  840. /// <summary>AutoStart observable as coroutine.</summary>
  841. public static Coroutine StartAsCoroutine<T>(this IObservable<T> source, Action<T> onResult, CancellationToken cancel = default(CancellationToken))
  842. {
  843. return StartAsCoroutine<T>(source, onResult, Stubs.Throw, cancel);
  844. }
  845. /// <summary>AutoStart observable as coroutine.</summary>
  846. public static Coroutine StartAsCoroutine<T>(this IObservable<T> source, Action<Exception> onError, CancellationToken cancel = default(CancellationToken))
  847. {
  848. return StartAsCoroutine<T>(source, Stubs<T>.Ignore, onError, cancel);
  849. }
  850. /// <summary>AutoStart observable as coroutine.</summary>
  851. public static Coroutine StartAsCoroutine<T>(this IObservable<T> source, Action<T> onResult, Action<Exception> onError, CancellationToken cancel = default(CancellationToken))
  852. {
  853. return MainThreadDispatcher.StartCoroutine(source.ToAwaitableEnumerator(onResult, onError, cancel));
  854. }
  855. public static IObservable<T> ObserveOnMainThread<T>(this IObservable<T> source)
  856. {
  857. return source.ObserveOn(SchedulerUnity.MainThread);
  858. }
  859. public static IObservable<T> ObserveOnMainThread<T>(this IObservable<T> source, MainThreadDispatchType dispatchType)
  860. {
  861. switch (dispatchType)
  862. {
  863. case MainThreadDispatchType.Update:
  864. return source.ObserveOnMainThread(); // faster path
  865. // others, bit slower
  866. case MainThreadDispatchType.FixedUpdate:
  867. return source.SelectMany(_ => EveryFixedUpdate().Take(1), (x, _) => x);
  868. case MainThreadDispatchType.EndOfFrame:
  869. return source.SelectMany(_ => EveryEndOfFrame().Take(1), (x, _) => x);
  870. case MainThreadDispatchType.GameObjectUpdate:
  871. return source.SelectMany(_ => MainThreadDispatcher.UpdateAsObservable().Take(1), (x, _) => x);
  872. case MainThreadDispatchType.LateUpdate:
  873. return source.SelectMany(_ => MainThreadDispatcher.LateUpdateAsObservable().Take(1), (x, _) => x);
  874. default:
  875. throw new ArgumentException("type is invalid");
  876. }
  877. }
  878. public static IObservable<T> SubscribeOnMainThread<T>(this IObservable<T> source)
  879. {
  880. return source.SubscribeOn(SchedulerUnity.MainThread);
  881. }
  882. // I can't avoid Unity 5.3's uNET weaver bug, pending...
  883. //public static IObservable<T> SubscribeOnMainThread<T>(this IObservable<T> source, MainThreadDispatchType dispatchType)
  884. //{
  885. // switch (dispatchType)
  886. // {
  887. // case MainThreadDispatchType.Update:
  888. // return source.SubscribeOnMainThread(); // faster path
  889. // // others, bit slower
  890. // case MainThreadDispatchType.FixedUpdate:
  891. // return new UniRx.Operators.SubscribeOnMainThreadObservable<T>(source, EveryFixedUpdate().Take(1));
  892. // case MainThreadDispatchType.EndOfFrame:
  893. // return new UniRx.Operators.SubscribeOnMainThreadObservable<T>(source, EveryEndOfFrame().Take(1));
  894. // case MainThreadDispatchType.GameObjectUpdate:
  895. // return new UniRx.Operators.SubscribeOnMainThreadObservable<T>(source, MainThreadDispatcher.UpdateAsObservable().Select(_ => 0L).Take(1));
  896. // case MainThreadDispatchType.LateUpdate:
  897. // return new UniRx.Operators.SubscribeOnMainThreadObservable<T>(source, MainThreadDispatcher.LateUpdateAsObservable().Select(_ => 0L).Take(1));
  898. // case MainThreadDispatchType.AfterUpdate:
  899. // return new UniRx.Operators.SubscribeOnMainThreadObservable<T>(source, EveryAfterUpdate().Take(1));
  900. // default:
  901. // throw new ArgumentException("type is invalid");
  902. // }
  903. //}
  904. public static IObservable<bool> EveryApplicationPause()
  905. {
  906. return MainThreadDispatcher.OnApplicationPauseAsObservable().AsObservable();
  907. }
  908. public static IObservable<bool> EveryApplicationFocus()
  909. {
  910. return MainThreadDispatcher.OnApplicationFocusAsObservable().AsObservable();
  911. }
  912. /// <summary>publish OnNext(Unit) and OnCompleted() on application quit.</summary>
  913. public static IObservable<Unit> OnceApplicationQuit()
  914. {
  915. return MainThreadDispatcher.OnApplicationQuitAsObservable().Take(1);
  916. }
  917. public static IObservable<T> TakeUntilDestroy<T>(this IObservable<T> source, Component target)
  918. {
  919. return source.TakeUntil(target.OnDestroyAsObservable());
  920. }
  921. public static IObservable<T> TakeUntilDestroy<T>(this IObservable<T> source, GameObject target)
  922. {
  923. return source.TakeUntil(target.OnDestroyAsObservable());
  924. }
  925. public static IObservable<T> TakeUntilDisable<T>(this IObservable<T> source, Component target)
  926. {
  927. return source.TakeUntil(target.OnDisableAsObservable());
  928. }
  929. public static IObservable<T> TakeUntilDisable<T>(this IObservable<T> source, GameObject target)
  930. {
  931. return source.TakeUntil(target.OnDisableAsObservable());
  932. }
  933. public static IObservable<T> RepeatUntilDestroy<T>(this IObservable<T> source, GameObject target)
  934. {
  935. return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), target);
  936. }
  937. public static IObservable<T> RepeatUntilDestroy<T>(this IObservable<T> source, Component target)
  938. {
  939. return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), (target != null) ? target.gameObject : null);
  940. }
  941. public static IObservable<T> RepeatUntilDisable<T>(this IObservable<T> source, GameObject target)
  942. {
  943. return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), target);
  944. }
  945. public static IObservable<T> RepeatUntilDisable<T>(this IObservable<T> source, Component target)
  946. {
  947. return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), (target != null) ? target.gameObject : null);
  948. }
  949. static IObservable<T> RepeatUntilCore<T>(this IEnumerable<IObservable<T>> sources, IObservable<Unit> trigger, GameObject lifeTimeChecker)
  950. {
  951. return new UniRx.Operators.RepeatUntilObservable<T>(sources, trigger, lifeTimeChecker);
  952. }
  953. public static IObservable<UniRx.FrameInterval<T>> FrameInterval<T>(this IObservable<T> source)
  954. {
  955. return new UniRx.Operators.FrameIntervalObservable<T>(source);
  956. }
  957. public static IObservable<UniRx.TimeInterval<T>> FrameTimeInterval<T>(this IObservable<T> source, bool ignoreTimeScale = false)
  958. {
  959. return new UniRx.Operators.FrameTimeIntervalObservable<T>(source, ignoreTimeScale);
  960. }
  961. /// <summary>
  962. /// Buffer elements in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame).
  963. /// </summary>
  964. public static IObservable<IList<T>> BatchFrame<T>(this IObservable<T> source)
  965. {
  966. // if use default argument, comiler errors ambiguous(Unity's limitation)
  967. return BatchFrame<T>(source, 0, FrameCountType.EndOfFrame);
  968. }
  969. /// <summary>
  970. /// Buffer elements in during target frame counts.
  971. /// </summary>
  972. public static IObservable<IList<T>> BatchFrame<T>(this IObservable<T> source, int frameCount, FrameCountType frameCountType)
  973. {
  974. if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount);
  975. return new UniRx.Operators.BatchFrameObservable<T>(source, frameCount, frameCountType);
  976. }
  977. /// <summary>
  978. /// Wait command in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame).
  979. /// </summary>
  980. public static IObservable<Unit> BatchFrame(this IObservable<Unit> source)
  981. {
  982. return BatchFrame(source, 0, FrameCountType.EndOfFrame);
  983. }
  984. /// <summary>
  985. /// Wait command in during target frame counts.
  986. /// </summary>
  987. public static IObservable<Unit> BatchFrame(this IObservable<Unit> source, int frameCount, FrameCountType frameCountType)
  988. {
  989. if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount);
  990. return new UniRx.Operators.BatchFrameObservable(source, frameCount, frameCountType);
  991. }
  992. #if UniRxLibrary
  993. static IEnumerable<IObservable<T>> RepeatInfinite<T>(IObservable<T> source)
  994. {
  995. while (true)
  996. {
  997. yield return source;
  998. }
  999. }
  1000. internal static class Stubs
  1001. {
  1002. public static readonly Action Nop = () => { };
  1003. public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
  1004. // Stubs<T>.Ignore can't avoid iOS AOT problem.
  1005. public static void Ignore<T>(T t)
  1006. {
  1007. }
  1008. // marker for CatchIgnore and Catch avoid iOS AOT problem.
  1009. public static IObservable<TSource> CatchIgnore<TSource>(Exception ex)
  1010. {
  1011. return Observable.Empty<TSource>();
  1012. }
  1013. }
  1014. #endif
  1015. }
  1016. }