#if !(UNITY_4_0 || UNITY_4_1 || UNITY_4_2 || UNITY_4_3 || UNITY_4_4 || UNITY_4_5 || UNITY_4_6 || UNITY_5_0 || UNITY_5_1 || UNITY_5_2) #define SupportCustomYieldInstruction #endif using System; using System.Collections; using System.Collections.Generic; using UniRx.InternalUtil; using UniRx.Triggers; using UnityEngine; using System.Threading; #if !UniRxLibrary using SchedulerUnity = UniRx.Scheduler; #endif namespace UniRx { public enum FrameCountType { Update, FixedUpdate, EndOfFrame, } public enum MainThreadDispatchType { /// yield return null Update, FixedUpdate, EndOfFrame, GameObjectUpdate, LateUpdate, } public static class FrameCountTypeExtensions { public static YieldInstruction GetYieldInstruction(this FrameCountType frameCountType) { switch (frameCountType) { case FrameCountType.FixedUpdate: return YieldInstructionCache.WaitForFixedUpdate; case FrameCountType.EndOfFrame: return YieldInstructionCache.WaitForEndOfFrame; case FrameCountType.Update: default: return null; } } } internal interface ICustomYieldInstructionErrorHandler { bool HasError { get; } Exception Error { get; } bool IsReThrowOnError { get; } void ForceDisableRethrowOnError(); void ForceEnableRethrowOnError(); } public class ObservableYieldInstruction : IEnumerator, ICustomYieldInstructionErrorHandler { readonly IDisposable subscription; readonly CancellationToken cancel; bool reThrowOnError; T current; T result; bool moveNext; bool hasResult; Exception error; public ObservableYieldInstruction(IObservable source, bool reThrowOnError, CancellationToken cancel) { this.moveNext = true; this.reThrowOnError = reThrowOnError; this.cancel = cancel; try { this.subscription = source.Subscribe(new ToYieldInstruction(this)); } catch { moveNext = false; throw; } } public bool HasError { get { return error != null; } } public bool HasResult { get { return hasResult; } } public bool IsCanceled { get { if (hasResult) return false; if (error != null) return false; return cancel.IsCancellationRequested; } } /// /// HasResult || IsCanceled || HasError /// public bool IsDone { get { return HasResult || HasError || (cancel.IsCancellationRequested); } } public T Result { get { return result; } } T IEnumerator.Current { get { return current; } } object IEnumerator.Current { get { return current; } } public Exception Error { get { return error; } } bool IEnumerator.MoveNext() { if (!moveNext) { if (reThrowOnError && HasError) { Error.Throw(); } return false; } if (cancel.IsCancellationRequested) { subscription.Dispose(); return false; } return true; } bool ICustomYieldInstructionErrorHandler.IsReThrowOnError { get { return reThrowOnError; } } void ICustomYieldInstructionErrorHandler.ForceDisableRethrowOnError() { this.reThrowOnError = false; } void ICustomYieldInstructionErrorHandler.ForceEnableRethrowOnError() { this.reThrowOnError = true; } public void Dispose() { subscription.Dispose(); } void IEnumerator.Reset() { throw new NotSupportedException(); } class ToYieldInstruction : IObserver { readonly ObservableYieldInstruction parent; public ToYieldInstruction(ObservableYieldInstruction parent) { this.parent = parent; } public void OnNext(T value) { parent.current = value; } public void OnError(Exception error) { parent.moveNext = false; parent.error = error; } public void OnCompleted() { parent.moveNext = false; parent.hasResult = true; parent.result = parent.current; } } } #if UniRxLibrary public static partial class ObservableUnity #else public static partial class Observable #endif { readonly static HashSet YieldInstructionTypes = new HashSet { #if UNITY_2018_3_OR_NEWER #pragma warning disable CS0618 #endif typeof(WWW), #if UNITY_2018_3_OR_NEWER #pragma warning restore CS0618 #endif typeof(WaitForEndOfFrame), typeof(WaitForFixedUpdate), typeof(WaitForSeconds), typeof(AsyncOperation), typeof(Coroutine) }; #if SupportCustomYieldInstruction class EveryAfterUpdateInvoker : IEnumerator { long count = -1; readonly IObserver observer; readonly CancellationToken cancellationToken; public EveryAfterUpdateInvoker(IObserver observer, CancellationToken cancellationToken) { this.observer = observer; this.cancellationToken = cancellationToken; } public bool MoveNext() { if (!cancellationToken.IsCancellationRequested) { if (count != -1) // ignore first/immediate invoke { observer.OnNext(count++); } else { count++; } return true; } else { return false; } } public object Current { get { return null; } } public void Reset() { throw new NotSupportedException(); } } #endif /// From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed. public static IObservable FromCoroutine(Func coroutine, bool publishEveryYield = false) { return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield)); } /// From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed. public static IObservable FromCoroutine(Func coroutine, bool publishEveryYield = false) { return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield)); } /// /// MicroCoroutine is lightweight, fast coroutine dispatcher. /// IEnumerator supports only yield return null. /// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed. /// public static IObservable FromMicroCoroutine(Func coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield), frameCountType); } /// /// MicroCoroutine is lightweight, fast coroutine dispatcher. /// IEnumerator supports only yield return null. /// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed. /// public static IObservable FromMicroCoroutine(Func coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield), frameCountType); } static IEnumerator WrapEnumerator(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken, bool publishEveryYield) { var hasNext = default(bool); var raisedError = false; do { try { hasNext = enumerator.MoveNext(); } catch (Exception ex) { try { raisedError = true; observer.OnError(ex); } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } yield break; } if (hasNext && publishEveryYield) { try { observer.OnNext(Unit.Default); } catch { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } throw; } } if (hasNext) { #if SupportCustomYieldInstruction var current = enumerator.Current; var customHandler = current as ICustomYieldInstructionErrorHandler; if (customHandler != null && customHandler.IsReThrowOnError) { // If throws exception in Custom YieldInsrtuction, can't handle parent coroutine. // It is C# limitation. // so store error info and retrieve from parent. customHandler.ForceDisableRethrowOnError(); yield return current; customHandler.ForceEnableRethrowOnError(); if (customHandler.HasError) { try { raisedError = true; observer.OnError(customHandler.Error); } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } yield break; } } else { yield return enumerator.Current; // yield inner YieldInstruction } #else yield return enumerator.Current; // yield inner YieldInstruction #endif } } while (hasNext && !cancellationToken.IsCancellationRequested); try { if (!raisedError && !cancellationToken.IsCancellationRequested) { observer.OnNext(Unit.Default); // last one observer.OnCompleted(); } } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } } /// Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext. public static IObservable FromCoroutineValue(Func coroutine, bool nullAsNextUpdate = true) { return FromCoroutine((observer, cancellationToken) => WrapEnumeratorYieldValue(coroutine(), observer, cancellationToken, nullAsNextUpdate)); } /// Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext. public static IObservable FromCoroutineValue(Func coroutine, bool nullAsNextUpdate = true) { return FromCoroutine((observer, cancellationToken) => WrapEnumeratorYieldValue(coroutine(cancellationToken), observer, cancellationToken, nullAsNextUpdate)); } static IEnumerator WrapEnumeratorYieldValue(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken, bool nullAsNextUpdate) { var hasNext = default(bool); var current = default(object); var raisedError = false; do { try { hasNext = enumerator.MoveNext(); if (hasNext) current = enumerator.Current; } catch (Exception ex) { try { raisedError = true; observer.OnError(ex); } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } yield break; } if (hasNext) { if (current != null && YieldInstructionTypes.Contains(current.GetType())) { yield return current; } #if SupportCustomYieldInstruction else if (current is IEnumerator) { var customHandler = current as ICustomYieldInstructionErrorHandler; if (customHandler != null && customHandler.IsReThrowOnError) { // If throws exception in Custom YieldInsrtuction, can't handle parent coroutine. // It is C# limitation. // so store error info and retrieve from parent. customHandler.ForceDisableRethrowOnError(); yield return current; customHandler.ForceEnableRethrowOnError(); if (customHandler.HasError) { try { raisedError = true; observer.OnError(customHandler.Error); } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } yield break; } } else { yield return current; } } #endif else if (current == null && nullAsNextUpdate) { yield return null; } else { try { observer.OnNext((T)current); } catch { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } throw; } } } } while (hasNext && !cancellationToken.IsCancellationRequested); try { if (!raisedError && !cancellationToken.IsCancellationRequested) { observer.OnCompleted(); } } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } } public static IObservable FromCoroutine(Func, IEnumerator> coroutine) { return FromCoroutine((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken)); } /// /// MicroCoroutine is lightweight, fast coroutine dispatcher. /// IEnumerator supports only yield return null. /// public static IObservable FromMicroCoroutine(Func, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken), frameCountType); } static IEnumerator WrapToCancellableEnumerator(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken) { var hasNext = default(bool); do { try { hasNext = enumerator.MoveNext(); } catch (Exception ex) { try { observer.OnError(ex); } finally { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } yield break; } yield return enumerator.Current; // yield inner YieldInstruction } while (hasNext && !cancellationToken.IsCancellationRequested); { var d = enumerator as IDisposable; if (d != null) { d.Dispose(); } } } public static IObservable FromCoroutine(Func, CancellationToken, IEnumerator> coroutine) { return new UniRx.Operators.FromCoroutineObservable(coroutine); } /// /// MicroCoroutine is lightweight, fast coroutine dispatcher. /// IEnumerator supports only yield return null. /// public static IObservable FromMicroCoroutine(Func, CancellationToken, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update) { return new UniRx.Operators.FromMicroCoroutineObservable(coroutine, frameCountType); } public static IObservable SelectMany(this IObservable source, IEnumerator coroutine, bool publishEveryYield = false) { return source.SelectMany(FromCoroutine(() => coroutine, publishEveryYield)); } public static IObservable SelectMany(this IObservable source, Func selector, bool publishEveryYield = false) { return source.SelectMany(FromCoroutine(() => selector(), publishEveryYield)); } /// /// Note: publishEveryYield is always false. If you want to set true, use Observable.FromCoroutine(() => selector(x), true). This is workaround of Unity compiler's bug. /// public static IObservable SelectMany(this IObservable source, Func selector) { return source.SelectMany(x => FromCoroutine(() => selector(x), false)); } public static IObservable ToObservable(this IEnumerator coroutine, bool publishEveryYield = false) { return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine, observer, cancellationToken, publishEveryYield)); } #if SupportCustomYieldInstruction public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine) { return ToObservable(coroutine, false).ToYieldInstruction(); } public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, bool throwOnError) { return ToObservable(coroutine, false).ToYieldInstruction(throwOnError); } public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, CancellationToken cancellationToken) { return ToObservable(coroutine, false).ToYieldInstruction(cancellationToken); } public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, bool throwOnError, CancellationToken cancellationToken) { return ToObservable(coroutine, false).ToYieldInstruction(throwOnError, cancellationToken); } #endif // variation of FromCoroutine /// /// EveryUpdate calls coroutine's yield return null timing. It is after all Update and before LateUpdate. /// public static IObservable EveryUpdate() { return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.Update); } public static IObservable EveryFixedUpdate() { return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.FixedUpdate); } public static IObservable EveryEndOfFrame() { return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.EndOfFrame); } static IEnumerator EveryCycleCore(IObserver observer, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) yield break; var count = 0L; while (true) { yield return null; if (cancellationToken.IsCancellationRequested) yield break; observer.OnNext(count++); } } /// /// EveryGameObjectUpdate calls from MainThreadDispatcher's Update. /// public static IObservable EveryGameObjectUpdate() { return MainThreadDispatcher.UpdateAsObservable().Scan(-1L, (x, y) => x + 1); } /// /// EveryLateUpdate calls from MainThreadDispatcher's OnLateUpdate. /// public static IObservable EveryLateUpdate() { return MainThreadDispatcher.LateUpdateAsObservable().Scan(-1L, (x, y) => x + 1); } #if SupportCustomYieldInstruction /// /// [Obsolete]Same as EveryUpdate. /// [Obsolete] public static IObservable EveryAfterUpdate() { return FromCoroutine((observer, cancellationToken) => new EveryAfterUpdateInvoker(observer, cancellationToken)); } #endif #region Observable.Time Frame Extensions // Interval, Timer, Delay, Sample, Throttle, Timeout public static IObservable NextFrame(FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellation) => NextFrameCore(observer, cancellation), frameCountType); } static IEnumerator NextFrameCore(IObserver observer, CancellationToken cancellation) { yield return null; if (!cancellation.IsCancellationRequested) { observer.OnNext(Unit.Default); observer.OnCompleted(); } } public static IObservable IntervalFrame(int intervalFrameCount, FrameCountType frameCountType = FrameCountType.Update) { return TimerFrame(intervalFrameCount, intervalFrameCount, frameCountType); } public static IObservable TimerFrame(int dueTimeFrameCount, FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, cancellation), frameCountType); } public static IObservable TimerFrame(int dueTimeFrameCount, int periodFrameCount, FrameCountType frameCountType = FrameCountType.Update) { return FromMicroCoroutine((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, periodFrameCount, cancellation), frameCountType); } static IEnumerator TimerFrameCore(IObserver observer, int dueTimeFrameCount, CancellationToken cancel) { // normalize if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0; var currentFrame = 0; // initial phase while (!cancel.IsCancellationRequested) { if (currentFrame++ == dueTimeFrameCount) { observer.OnNext(0); observer.OnCompleted(); break; } yield return null; } } static IEnumerator TimerFrameCore(IObserver observer, int dueTimeFrameCount, int periodFrameCount, CancellationToken cancel) { // normalize if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0; if (periodFrameCount <= 0) periodFrameCount = 1; var sendCount = 0L; var currentFrame = 0; // initial phase while (!cancel.IsCancellationRequested) { if (currentFrame++ == dueTimeFrameCount) { observer.OnNext(sendCount++); currentFrame = -1; break; } yield return null; } // period phase while (!cancel.IsCancellationRequested) { if (++currentFrame == periodFrameCount) { observer.OnNext(sendCount++); currentFrame = 0; } yield return null; } } public static IObservable DelayFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.DelayFrameObservable(source, frameCount, frameCountType); } public static IObservable Sample(this IObservable source, IObservable sampler) { return new UniRx.Operators.SampleObservable(source, sampler); } public static IObservable SampleFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.SampleFrameObservable(source, frameCount, frameCountType); } public static IObservable ThrottleFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.ThrottleFrameObservable(source, frameCount, frameCountType); } public static IObservable ThrottleFirstFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.ThrottleFirstFrameObservable(source, frameCount, frameCountType); } public static IObservable TimeoutFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.TimeoutFrameObservable(source, frameCount, frameCountType); } public static IObservable DelayFrameSubscription(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update) { if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount"); return new UniRx.Operators.DelayFrameSubscriptionObservable(source, frameCount, frameCountType); } #endregion #if SupportCustomYieldInstruction /// /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property. /// This overload throws exception if received OnError events(same as coroutine). /// public static ObservableYieldInstruction ToYieldInstruction(this IObservable source) { return new ObservableYieldInstruction(source, true, CancellationToken.None); } /// /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property. /// This overload throws exception if received OnError events(same as coroutine). /// public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, CancellationToken cancel) { return new ObservableYieldInstruction(source, true, cancel); } /// /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property. /// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property. /// public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, bool throwOnError) { return new ObservableYieldInstruction(source, throwOnError, CancellationToken.None); } /// /// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();. /// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property. /// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property. /// public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, bool throwOnError, CancellationToken cancel) { return new ObservableYieldInstruction(source, throwOnError, cancel); } #endif /// Convert to awaitable IEnumerator. public static IEnumerator ToAwaitableEnumerator(this IObservable source, CancellationToken cancel = default(CancellationToken)) { return ToAwaitableEnumerator(source, Stubs.Ignore, Stubs.Throw, cancel); } /// Convert to awaitable IEnumerator. public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onResult, CancellationToken cancel = default(CancellationToken)) { return ToAwaitableEnumerator(source, onResult, Stubs.Throw, cancel); } /// Convert to awaitable IEnumerator. public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onError, CancellationToken cancel = default(CancellationToken)) { return ToAwaitableEnumerator(source, Stubs.Ignore, onError, cancel); } /// Convert to awaitable IEnumerator. public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onResult, Action onError, CancellationToken cancel = default(CancellationToken)) { var enumerator = new ObservableYieldInstruction(source, false, cancel); var e = (IEnumerator)enumerator; while (e.MoveNext() && !cancel.IsCancellationRequested) { yield return null; } if (cancel.IsCancellationRequested) { enumerator.Dispose(); yield break; } if (enumerator.HasResult) { onResult(enumerator.Result); } else if (enumerator.HasError) { onError(enumerator.Error); } } /// AutoStart observable as coroutine. public static Coroutine StartAsCoroutine(this IObservable source, CancellationToken cancel = default(CancellationToken)) { return StartAsCoroutine(source, Stubs.Ignore, Stubs.Throw, cancel); } /// AutoStart observable as coroutine. public static Coroutine StartAsCoroutine(this IObservable source, Action onResult, CancellationToken cancel = default(CancellationToken)) { return StartAsCoroutine(source, onResult, Stubs.Throw, cancel); } /// AutoStart observable as coroutine. public static Coroutine StartAsCoroutine(this IObservable source, Action onError, CancellationToken cancel = default(CancellationToken)) { return StartAsCoroutine(source, Stubs.Ignore, onError, cancel); } /// AutoStart observable as coroutine. public static Coroutine StartAsCoroutine(this IObservable source, Action onResult, Action onError, CancellationToken cancel = default(CancellationToken)) { return MainThreadDispatcher.StartCoroutine(source.ToAwaitableEnumerator(onResult, onError, cancel)); } public static IObservable ObserveOnMainThread(this IObservable source) { return source.ObserveOn(SchedulerUnity.MainThread); } public static IObservable ObserveOnMainThread(this IObservable source, MainThreadDispatchType dispatchType) { switch (dispatchType) { case MainThreadDispatchType.Update: return source.ObserveOnMainThread(); // faster path // others, bit slower case MainThreadDispatchType.FixedUpdate: return source.SelectMany(_ => EveryFixedUpdate().Take(1), (x, _) => x); case MainThreadDispatchType.EndOfFrame: return source.SelectMany(_ => EveryEndOfFrame().Take(1), (x, _) => x); case MainThreadDispatchType.GameObjectUpdate: return source.SelectMany(_ => MainThreadDispatcher.UpdateAsObservable().Take(1), (x, _) => x); case MainThreadDispatchType.LateUpdate: return source.SelectMany(_ => MainThreadDispatcher.LateUpdateAsObservable().Take(1), (x, _) => x); default: throw new ArgumentException("type is invalid"); } } public static IObservable SubscribeOnMainThread(this IObservable source) { return source.SubscribeOn(SchedulerUnity.MainThread); } // I can't avoid Unity 5.3's uNET weaver bug, pending... //public static IObservable SubscribeOnMainThread(this IObservable source, MainThreadDispatchType dispatchType) //{ // switch (dispatchType) // { // case MainThreadDispatchType.Update: // return source.SubscribeOnMainThread(); // faster path // // others, bit slower // case MainThreadDispatchType.FixedUpdate: // return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryFixedUpdate().Take(1)); // case MainThreadDispatchType.EndOfFrame: // return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryEndOfFrame().Take(1)); // case MainThreadDispatchType.GameObjectUpdate: // return new UniRx.Operators.SubscribeOnMainThreadObservable(source, MainThreadDispatcher.UpdateAsObservable().Select(_ => 0L).Take(1)); // case MainThreadDispatchType.LateUpdate: // return new UniRx.Operators.SubscribeOnMainThreadObservable(source, MainThreadDispatcher.LateUpdateAsObservable().Select(_ => 0L).Take(1)); // case MainThreadDispatchType.AfterUpdate: // return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryAfterUpdate().Take(1)); // default: // throw new ArgumentException("type is invalid"); // } //} public static IObservable EveryApplicationPause() { return MainThreadDispatcher.OnApplicationPauseAsObservable().AsObservable(); } public static IObservable EveryApplicationFocus() { return MainThreadDispatcher.OnApplicationFocusAsObservable().AsObservable(); } /// publish OnNext(Unit) and OnCompleted() on application quit. public static IObservable OnceApplicationQuit() { return MainThreadDispatcher.OnApplicationQuitAsObservable().Take(1); } public static IObservable TakeUntilDestroy(this IObservable source, Component target) { return source.TakeUntil(target.OnDestroyAsObservable()); } public static IObservable TakeUntilDestroy(this IObservable source, GameObject target) { return source.TakeUntil(target.OnDestroyAsObservable()); } public static IObservable TakeUntilDisable(this IObservable source, Component target) { return source.TakeUntil(target.OnDisableAsObservable()); } public static IObservable TakeUntilDisable(this IObservable source, GameObject target) { return source.TakeUntil(target.OnDisableAsObservable()); } public static IObservable RepeatUntilDestroy(this IObservable source, GameObject target) { return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), target); } public static IObservable RepeatUntilDestroy(this IObservable source, Component target) { return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), (target != null) ? target.gameObject : null); } public static IObservable RepeatUntilDisable(this IObservable source, GameObject target) { return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), target); } public static IObservable RepeatUntilDisable(this IObservable source, Component target) { return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), (target != null) ? target.gameObject : null); } static IObservable RepeatUntilCore(this IEnumerable> sources, IObservable trigger, GameObject lifeTimeChecker) { return new UniRx.Operators.RepeatUntilObservable(sources, trigger, lifeTimeChecker); } public static IObservable> FrameInterval(this IObservable source) { return new UniRx.Operators.FrameIntervalObservable(source); } public static IObservable> FrameTimeInterval(this IObservable source, bool ignoreTimeScale = false) { return new UniRx.Operators.FrameTimeIntervalObservable(source, ignoreTimeScale); } /// /// Buffer elements in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame). /// public static IObservable> BatchFrame(this IObservable source) { // if use default argument, comiler errors ambiguous(Unity's limitation) return BatchFrame(source, 0, FrameCountType.EndOfFrame); } /// /// Buffer elements in during target frame counts. /// public static IObservable> BatchFrame(this IObservable source, int frameCount, FrameCountType frameCountType) { if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount); return new UniRx.Operators.BatchFrameObservable(source, frameCount, frameCountType); } /// /// Wait command in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame). /// public static IObservable BatchFrame(this IObservable source) { return BatchFrame(source, 0, FrameCountType.EndOfFrame); } /// /// Wait command in during target frame counts. /// public static IObservable BatchFrame(this IObservable source, int frameCount, FrameCountType frameCountType) { if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount); return new UniRx.Operators.BatchFrameObservable(source, frameCount, frameCountType); } #if UniRxLibrary static IEnumerable> RepeatInfinite(IObservable source) { while (true) { yield return source; } } internal static class Stubs { public static readonly Action Nop = () => { }; public static readonly Action Throw = ex => { ex.Throw(); }; // Stubs.Ignore can't avoid iOS AOT problem. public static void Ignore(T t) { } // marker for CatchIgnore and Catch avoid iOS AOT problem. public static IObservable CatchIgnore(Exception ex) { return Observable.Empty(); } } #endif } }