#if !(UNITY_4_0 || UNITY_4_1 || UNITY_4_2 || UNITY_4_3 || UNITY_4_4 || UNITY_4_5 || UNITY_4_6 || UNITY_5_0 || UNITY_5_1 || UNITY_5_2)
#define SupportCustomYieldInstruction
#endif
using System;
using System.Collections;
using System.Collections.Generic;
using UniRx.InternalUtil;
using UniRx.Triggers;
using UnityEngine;
using System.Threading;
#if !UniRxLibrary
using SchedulerUnity = UniRx.Scheduler;
#endif
namespace UniRx
{
public enum FrameCountType
{
Update,
FixedUpdate,
EndOfFrame,
}
public enum MainThreadDispatchType
{
/// yield return null
Update,
FixedUpdate,
EndOfFrame,
GameObjectUpdate,
LateUpdate,
}
public static class FrameCountTypeExtensions
{
public static YieldInstruction GetYieldInstruction(this FrameCountType frameCountType)
{
switch (frameCountType)
{
case FrameCountType.FixedUpdate:
return YieldInstructionCache.WaitForFixedUpdate;
case FrameCountType.EndOfFrame:
return YieldInstructionCache.WaitForEndOfFrame;
case FrameCountType.Update:
default:
return null;
}
}
}
internal interface ICustomYieldInstructionErrorHandler
{
bool HasError { get; }
Exception Error { get; }
bool IsReThrowOnError { get; }
void ForceDisableRethrowOnError();
void ForceEnableRethrowOnError();
}
public class ObservableYieldInstruction : IEnumerator, ICustomYieldInstructionErrorHandler
{
readonly IDisposable subscription;
readonly CancellationToken cancel;
bool reThrowOnError;
T current;
T result;
bool moveNext;
bool hasResult;
Exception error;
public ObservableYieldInstruction(IObservable source, bool reThrowOnError, CancellationToken cancel)
{
this.moveNext = true;
this.reThrowOnError = reThrowOnError;
this.cancel = cancel;
try
{
this.subscription = source.Subscribe(new ToYieldInstruction(this));
}
catch
{
moveNext = false;
throw;
}
}
public bool HasError
{
get { return error != null; }
}
public bool HasResult
{
get { return hasResult; }
}
public bool IsCanceled
{
get
{
if (hasResult) return false;
if (error != null) return false;
return cancel.IsCancellationRequested;
}
}
///
/// HasResult || IsCanceled || HasError
///
public bool IsDone
{
get
{
return HasResult || HasError || (cancel.IsCancellationRequested);
}
}
public T Result
{
get { return result; }
}
T IEnumerator.Current
{
get
{
return current;
}
}
object IEnumerator.Current
{
get
{
return current;
}
}
public Exception Error
{
get
{
return error;
}
}
bool IEnumerator.MoveNext()
{
if (!moveNext)
{
if (reThrowOnError && HasError)
{
Error.Throw();
}
return false;
}
if (cancel.IsCancellationRequested)
{
subscription.Dispose();
return false;
}
return true;
}
bool ICustomYieldInstructionErrorHandler.IsReThrowOnError
{
get { return reThrowOnError; }
}
void ICustomYieldInstructionErrorHandler.ForceDisableRethrowOnError()
{
this.reThrowOnError = false;
}
void ICustomYieldInstructionErrorHandler.ForceEnableRethrowOnError()
{
this.reThrowOnError = true;
}
public void Dispose()
{
subscription.Dispose();
}
void IEnumerator.Reset()
{
throw new NotSupportedException();
}
class ToYieldInstruction : IObserver
{
readonly ObservableYieldInstruction parent;
public ToYieldInstruction(ObservableYieldInstruction parent)
{
this.parent = parent;
}
public void OnNext(T value)
{
parent.current = value;
}
public void OnError(Exception error)
{
parent.moveNext = false;
parent.error = error;
}
public void OnCompleted()
{
parent.moveNext = false;
parent.hasResult = true;
parent.result = parent.current;
}
}
}
#if UniRxLibrary
public static partial class ObservableUnity
#else
public static partial class Observable
#endif
{
readonly static HashSet YieldInstructionTypes = new HashSet
{
#if UNITY_2018_3_OR_NEWER
#pragma warning disable CS0618
#endif
typeof(WWW),
#if UNITY_2018_3_OR_NEWER
#pragma warning restore CS0618
#endif
typeof(WaitForEndOfFrame),
typeof(WaitForFixedUpdate),
typeof(WaitForSeconds),
typeof(AsyncOperation),
typeof(Coroutine)
};
#if SupportCustomYieldInstruction
class EveryAfterUpdateInvoker : IEnumerator
{
long count = -1;
readonly IObserver observer;
readonly CancellationToken cancellationToken;
public EveryAfterUpdateInvoker(IObserver observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.cancellationToken = cancellationToken;
}
public bool MoveNext()
{
if (!cancellationToken.IsCancellationRequested)
{
if (count != -1) // ignore first/immediate invoke
{
observer.OnNext(count++);
}
else
{
count++;
}
return true;
}
else
{
return false;
}
}
public object Current
{
get
{
return null;
}
}
public void Reset()
{
throw new NotSupportedException();
}
}
#endif
/// From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
public static IObservable FromCoroutine(Func coroutine, bool publishEveryYield = false)
{
return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield));
}
/// From has no callback coroutine to IObservable. If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
public static IObservable FromCoroutine(Func coroutine, bool publishEveryYield = false)
{
return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield));
}
///
/// MicroCoroutine is lightweight, fast coroutine dispatcher.
/// IEnumerator supports only yield return null.
/// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
///
public static IObservable FromMicroCoroutine(Func coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(), observer, cancellationToken, publishEveryYield), frameCountType);
}
///
/// MicroCoroutine is lightweight, fast coroutine dispatcher.
/// IEnumerator supports only yield return null.
/// If publishEveryYield = true then publish OnNext every yield return else return once on enumeration completed.
///
public static IObservable FromMicroCoroutine(Func coroutine, bool publishEveryYield = false, FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine(cancellationToken), observer, cancellationToken, publishEveryYield), frameCountType);
}
static IEnumerator WrapEnumerator(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken, bool publishEveryYield)
{
var hasNext = default(bool);
var raisedError = false;
do
{
try
{
hasNext = enumerator.MoveNext();
}
catch (Exception ex)
{
try
{
raisedError = true;
observer.OnError(ex);
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
yield break;
}
if (hasNext && publishEveryYield)
{
try
{
observer.OnNext(Unit.Default);
}
catch
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
throw;
}
}
if (hasNext)
{
#if SupportCustomYieldInstruction
var current = enumerator.Current;
var customHandler = current as ICustomYieldInstructionErrorHandler;
if (customHandler != null && customHandler.IsReThrowOnError)
{
// If throws exception in Custom YieldInsrtuction, can't handle parent coroutine.
// It is C# limitation.
// so store error info and retrieve from parent.
customHandler.ForceDisableRethrowOnError();
yield return current;
customHandler.ForceEnableRethrowOnError();
if (customHandler.HasError)
{
try
{
raisedError = true;
observer.OnError(customHandler.Error);
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
yield break;
}
}
else
{
yield return enumerator.Current; // yield inner YieldInstruction
}
#else
yield return enumerator.Current; // yield inner YieldInstruction
#endif
}
} while (hasNext && !cancellationToken.IsCancellationRequested);
try
{
if (!raisedError && !cancellationToken.IsCancellationRequested)
{
observer.OnNext(Unit.Default); // last one
observer.OnCompleted();
}
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
}
/// Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext.
public static IObservable FromCoroutineValue(Func coroutine, bool nullAsNextUpdate = true)
{
return FromCoroutine((observer, cancellationToken) => WrapEnumeratorYieldValue(coroutine(), observer, cancellationToken, nullAsNextUpdate));
}
/// Convert coroutine to typed IObservable. If nullAsNextUpdate = true then yield return null when Enumerator.Current and no null publish observer.OnNext.
public static IObservable FromCoroutineValue(Func coroutine, bool nullAsNextUpdate = true)
{
return FromCoroutine((observer, cancellationToken) => WrapEnumeratorYieldValue(coroutine(cancellationToken), observer, cancellationToken, nullAsNextUpdate));
}
static IEnumerator WrapEnumeratorYieldValue(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken, bool nullAsNextUpdate)
{
var hasNext = default(bool);
var current = default(object);
var raisedError = false;
do
{
try
{
hasNext = enumerator.MoveNext();
if (hasNext) current = enumerator.Current;
}
catch (Exception ex)
{
try
{
raisedError = true;
observer.OnError(ex);
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
yield break;
}
if (hasNext)
{
if (current != null && YieldInstructionTypes.Contains(current.GetType()))
{
yield return current;
}
#if SupportCustomYieldInstruction
else if (current is IEnumerator)
{
var customHandler = current as ICustomYieldInstructionErrorHandler;
if (customHandler != null && customHandler.IsReThrowOnError)
{
// If throws exception in Custom YieldInsrtuction, can't handle parent coroutine.
// It is C# limitation.
// so store error info and retrieve from parent.
customHandler.ForceDisableRethrowOnError();
yield return current;
customHandler.ForceEnableRethrowOnError();
if (customHandler.HasError)
{
try
{
raisedError = true;
observer.OnError(customHandler.Error);
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
yield break;
}
}
else
{
yield return current;
}
}
#endif
else if (current == null && nullAsNextUpdate)
{
yield return null;
}
else
{
try
{
observer.OnNext((T)current);
}
catch
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
throw;
}
}
}
} while (hasNext && !cancellationToken.IsCancellationRequested);
try
{
if (!raisedError && !cancellationToken.IsCancellationRequested)
{
observer.OnCompleted();
}
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
}
public static IObservable FromCoroutine(Func, IEnumerator> coroutine)
{
return FromCoroutine((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken));
}
///
/// MicroCoroutine is lightweight, fast coroutine dispatcher.
/// IEnumerator supports only yield return null.
///
public static IObservable FromMicroCoroutine(Func, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellationToken) => WrapToCancellableEnumerator(coroutine(observer), observer, cancellationToken), frameCountType);
}
static IEnumerator WrapToCancellableEnumerator(IEnumerator enumerator, IObserver observer, CancellationToken cancellationToken)
{
var hasNext = default(bool);
do
{
try
{
hasNext = enumerator.MoveNext();
}
catch (Exception ex)
{
try
{
observer.OnError(ex);
}
finally
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
yield break;
}
yield return enumerator.Current; // yield inner YieldInstruction
} while (hasNext && !cancellationToken.IsCancellationRequested);
{
var d = enumerator as IDisposable;
if (d != null)
{
d.Dispose();
}
}
}
public static IObservable FromCoroutine(Func, CancellationToken, IEnumerator> coroutine)
{
return new UniRx.Operators.FromCoroutineObservable(coroutine);
}
///
/// MicroCoroutine is lightweight, fast coroutine dispatcher.
/// IEnumerator supports only yield return null.
///
public static IObservable FromMicroCoroutine(Func, CancellationToken, IEnumerator> coroutine, FrameCountType frameCountType = FrameCountType.Update)
{
return new UniRx.Operators.FromMicroCoroutineObservable(coroutine, frameCountType);
}
public static IObservable SelectMany(this IObservable source, IEnumerator coroutine, bool publishEveryYield = false)
{
return source.SelectMany(FromCoroutine(() => coroutine, publishEveryYield));
}
public static IObservable SelectMany(this IObservable source, Func selector, bool publishEveryYield = false)
{
return source.SelectMany(FromCoroutine(() => selector(), publishEveryYield));
}
///
/// Note: publishEveryYield is always false. If you want to set true, use Observable.FromCoroutine(() => selector(x), true). This is workaround of Unity compiler's bug.
///
public static IObservable SelectMany(this IObservable source, Func selector)
{
return source.SelectMany(x => FromCoroutine(() => selector(x), false));
}
public static IObservable ToObservable(this IEnumerator coroutine, bool publishEveryYield = false)
{
return FromCoroutine((observer, cancellationToken) => WrapEnumerator(coroutine, observer, cancellationToken, publishEveryYield));
}
#if SupportCustomYieldInstruction
public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine)
{
return ToObservable(coroutine, false).ToYieldInstruction();
}
public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, bool throwOnError)
{
return ToObservable(coroutine, false).ToYieldInstruction(throwOnError);
}
public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, CancellationToken cancellationToken)
{
return ToObservable(coroutine, false).ToYieldInstruction(cancellationToken);
}
public static ObservableYieldInstruction ToYieldInstruction(this IEnumerator coroutine, bool throwOnError, CancellationToken cancellationToken)
{
return ToObservable(coroutine, false).ToYieldInstruction(throwOnError, cancellationToken);
}
#endif
// variation of FromCoroutine
///
/// EveryUpdate calls coroutine's yield return null timing. It is after all Update and before LateUpdate.
///
public static IObservable EveryUpdate()
{
return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.Update);
}
public static IObservable EveryFixedUpdate()
{
return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.FixedUpdate);
}
public static IObservable EveryEndOfFrame()
{
return FromMicroCoroutine((observer, cancellationToken) => EveryCycleCore(observer, cancellationToken), FrameCountType.EndOfFrame);
}
static IEnumerator EveryCycleCore(IObserver observer, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) yield break;
var count = 0L;
while (true)
{
yield return null;
if (cancellationToken.IsCancellationRequested) yield break;
observer.OnNext(count++);
}
}
///
/// EveryGameObjectUpdate calls from MainThreadDispatcher's Update.
///
public static IObservable EveryGameObjectUpdate()
{
return MainThreadDispatcher.UpdateAsObservable().Scan(-1L, (x, y) => x + 1);
}
///
/// EveryLateUpdate calls from MainThreadDispatcher's OnLateUpdate.
///
public static IObservable EveryLateUpdate()
{
return MainThreadDispatcher.LateUpdateAsObservable().Scan(-1L, (x, y) => x + 1);
}
#if SupportCustomYieldInstruction
///
/// [Obsolete]Same as EveryUpdate.
///
[Obsolete]
public static IObservable EveryAfterUpdate()
{
return FromCoroutine((observer, cancellationToken) => new EveryAfterUpdateInvoker(observer, cancellationToken));
}
#endif
#region Observable.Time Frame Extensions
// Interval, Timer, Delay, Sample, Throttle, Timeout
public static IObservable NextFrame(FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellation) => NextFrameCore(observer, cancellation), frameCountType);
}
static IEnumerator NextFrameCore(IObserver observer, CancellationToken cancellation)
{
yield return null;
if (!cancellation.IsCancellationRequested)
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
}
public static IObservable IntervalFrame(int intervalFrameCount, FrameCountType frameCountType = FrameCountType.Update)
{
return TimerFrame(intervalFrameCount, intervalFrameCount, frameCountType);
}
public static IObservable TimerFrame(int dueTimeFrameCount, FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, cancellation), frameCountType);
}
public static IObservable TimerFrame(int dueTimeFrameCount, int periodFrameCount, FrameCountType frameCountType = FrameCountType.Update)
{
return FromMicroCoroutine((observer, cancellation) => TimerFrameCore(observer, dueTimeFrameCount, periodFrameCount, cancellation), frameCountType);
}
static IEnumerator TimerFrameCore(IObserver observer, int dueTimeFrameCount, CancellationToken cancel)
{
// normalize
if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0;
var currentFrame = 0;
// initial phase
while (!cancel.IsCancellationRequested)
{
if (currentFrame++ == dueTimeFrameCount)
{
observer.OnNext(0);
observer.OnCompleted();
break;
}
yield return null;
}
}
static IEnumerator TimerFrameCore(IObserver observer, int dueTimeFrameCount, int periodFrameCount, CancellationToken cancel)
{
// normalize
if (dueTimeFrameCount <= 0) dueTimeFrameCount = 0;
if (periodFrameCount <= 0) periodFrameCount = 1;
var sendCount = 0L;
var currentFrame = 0;
// initial phase
while (!cancel.IsCancellationRequested)
{
if (currentFrame++ == dueTimeFrameCount)
{
observer.OnNext(sendCount++);
currentFrame = -1;
break;
}
yield return null;
}
// period phase
while (!cancel.IsCancellationRequested)
{
if (++currentFrame == periodFrameCount)
{
observer.OnNext(sendCount++);
currentFrame = 0;
}
yield return null;
}
}
public static IObservable DelayFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.DelayFrameObservable(source, frameCount, frameCountType);
}
public static IObservable Sample(this IObservable source, IObservable sampler)
{
return new UniRx.Operators.SampleObservable(source, sampler);
}
public static IObservable SampleFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.SampleFrameObservable(source, frameCount, frameCountType);
}
public static IObservable ThrottleFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.ThrottleFrameObservable(source, frameCount, frameCountType);
}
public static IObservable ThrottleFirstFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.ThrottleFirstFrameObservable(source, frameCount, frameCountType);
}
public static IObservable TimeoutFrame(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.TimeoutFrameObservable(source, frameCount, frameCountType);
}
public static IObservable DelayFrameSubscription(this IObservable source, int frameCount, FrameCountType frameCountType = FrameCountType.Update)
{
if (frameCount < 0) throw new ArgumentOutOfRangeException("frameCount");
return new UniRx.Operators.DelayFrameSubscriptionObservable(source, frameCount, frameCountType);
}
#endregion
#if SupportCustomYieldInstruction
///
/// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
/// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
/// This overload throws exception if received OnError events(same as coroutine).
///
public static ObservableYieldInstruction ToYieldInstruction(this IObservable source)
{
return new ObservableYieldInstruction(source, true, CancellationToken.None);
}
///
/// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
/// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
/// This overload throws exception if received OnError events(same as coroutine).
///
public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, CancellationToken cancel)
{
return new ObservableYieldInstruction(source, true, cancel);
}
///
/// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
/// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
/// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property.
///
public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, bool throwOnError)
{
return new ObservableYieldInstruction(source, throwOnError, CancellationToken.None);
}
///
/// Convert to yieldable IEnumerator. e.g. yield return source.ToYieldInstruction();.
/// If needs last result, you can take ObservableYieldInstruction.HasResult/Result property.
/// If throwOnError = false, you can take ObservableYieldInstruction.HasError/Error property.
///
public static ObservableYieldInstruction ToYieldInstruction(this IObservable source, bool throwOnError, CancellationToken cancel)
{
return new ObservableYieldInstruction(source, throwOnError, cancel);
}
#endif
/// Convert to awaitable IEnumerator.
public static IEnumerator ToAwaitableEnumerator(this IObservable source, CancellationToken cancel = default(CancellationToken))
{
return ToAwaitableEnumerator(source, Stubs.Ignore, Stubs.Throw, cancel);
}
/// Convert to awaitable IEnumerator.
public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onResult, CancellationToken cancel = default(CancellationToken))
{
return ToAwaitableEnumerator(source, onResult, Stubs.Throw, cancel);
}
/// Convert to awaitable IEnumerator.
public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onError, CancellationToken cancel = default(CancellationToken))
{
return ToAwaitableEnumerator(source, Stubs.Ignore, onError, cancel);
}
/// Convert to awaitable IEnumerator.
public static IEnumerator ToAwaitableEnumerator(this IObservable source, Action onResult, Action onError, CancellationToken cancel = default(CancellationToken))
{
var enumerator = new ObservableYieldInstruction(source, false, cancel);
var e = (IEnumerator)enumerator;
while (e.MoveNext() && !cancel.IsCancellationRequested)
{
yield return null;
}
if (cancel.IsCancellationRequested)
{
enumerator.Dispose();
yield break;
}
if (enumerator.HasResult)
{
onResult(enumerator.Result);
}
else if (enumerator.HasError)
{
onError(enumerator.Error);
}
}
/// AutoStart observable as coroutine.
public static Coroutine StartAsCoroutine(this IObservable source, CancellationToken cancel = default(CancellationToken))
{
return StartAsCoroutine(source, Stubs.Ignore, Stubs.Throw, cancel);
}
/// AutoStart observable as coroutine.
public static Coroutine StartAsCoroutine(this IObservable source, Action onResult, CancellationToken cancel = default(CancellationToken))
{
return StartAsCoroutine(source, onResult, Stubs.Throw, cancel);
}
/// AutoStart observable as coroutine.
public static Coroutine StartAsCoroutine(this IObservable source, Action onError, CancellationToken cancel = default(CancellationToken))
{
return StartAsCoroutine(source, Stubs.Ignore, onError, cancel);
}
/// AutoStart observable as coroutine.
public static Coroutine StartAsCoroutine(this IObservable source, Action onResult, Action onError, CancellationToken cancel = default(CancellationToken))
{
return MainThreadDispatcher.StartCoroutine(source.ToAwaitableEnumerator(onResult, onError, cancel));
}
public static IObservable ObserveOnMainThread(this IObservable source)
{
return source.ObserveOn(SchedulerUnity.MainThread);
}
public static IObservable ObserveOnMainThread(this IObservable source, MainThreadDispatchType dispatchType)
{
switch (dispatchType)
{
case MainThreadDispatchType.Update:
return source.ObserveOnMainThread(); // faster path
// others, bit slower
case MainThreadDispatchType.FixedUpdate:
return source.SelectMany(_ => EveryFixedUpdate().Take(1), (x, _) => x);
case MainThreadDispatchType.EndOfFrame:
return source.SelectMany(_ => EveryEndOfFrame().Take(1), (x, _) => x);
case MainThreadDispatchType.GameObjectUpdate:
return source.SelectMany(_ => MainThreadDispatcher.UpdateAsObservable().Take(1), (x, _) => x);
case MainThreadDispatchType.LateUpdate:
return source.SelectMany(_ => MainThreadDispatcher.LateUpdateAsObservable().Take(1), (x, _) => x);
default:
throw new ArgumentException("type is invalid");
}
}
public static IObservable SubscribeOnMainThread(this IObservable source)
{
return source.SubscribeOn(SchedulerUnity.MainThread);
}
// I can't avoid Unity 5.3's uNET weaver bug, pending...
//public static IObservable SubscribeOnMainThread(this IObservable source, MainThreadDispatchType dispatchType)
//{
// switch (dispatchType)
// {
// case MainThreadDispatchType.Update:
// return source.SubscribeOnMainThread(); // faster path
// // others, bit slower
// case MainThreadDispatchType.FixedUpdate:
// return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryFixedUpdate().Take(1));
// case MainThreadDispatchType.EndOfFrame:
// return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryEndOfFrame().Take(1));
// case MainThreadDispatchType.GameObjectUpdate:
// return new UniRx.Operators.SubscribeOnMainThreadObservable(source, MainThreadDispatcher.UpdateAsObservable().Select(_ => 0L).Take(1));
// case MainThreadDispatchType.LateUpdate:
// return new UniRx.Operators.SubscribeOnMainThreadObservable(source, MainThreadDispatcher.LateUpdateAsObservable().Select(_ => 0L).Take(1));
// case MainThreadDispatchType.AfterUpdate:
// return new UniRx.Operators.SubscribeOnMainThreadObservable(source, EveryAfterUpdate().Take(1));
// default:
// throw new ArgumentException("type is invalid");
// }
//}
public static IObservable EveryApplicationPause()
{
return MainThreadDispatcher.OnApplicationPauseAsObservable().AsObservable();
}
public static IObservable EveryApplicationFocus()
{
return MainThreadDispatcher.OnApplicationFocusAsObservable().AsObservable();
}
/// publish OnNext(Unit) and OnCompleted() on application quit.
public static IObservable OnceApplicationQuit()
{
return MainThreadDispatcher.OnApplicationQuitAsObservable().Take(1);
}
public static IObservable TakeUntilDestroy(this IObservable source, Component target)
{
return source.TakeUntil(target.OnDestroyAsObservable());
}
public static IObservable TakeUntilDestroy(this IObservable source, GameObject target)
{
return source.TakeUntil(target.OnDestroyAsObservable());
}
public static IObservable TakeUntilDisable(this IObservable source, Component target)
{
return source.TakeUntil(target.OnDisableAsObservable());
}
public static IObservable TakeUntilDisable(this IObservable source, GameObject target)
{
return source.TakeUntil(target.OnDisableAsObservable());
}
public static IObservable RepeatUntilDestroy(this IObservable source, GameObject target)
{
return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), target);
}
public static IObservable RepeatUntilDestroy(this IObservable source, Component target)
{
return RepeatUntilCore(RepeatInfinite(source), target.OnDestroyAsObservable(), (target != null) ? target.gameObject : null);
}
public static IObservable RepeatUntilDisable(this IObservable source, GameObject target)
{
return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), target);
}
public static IObservable RepeatUntilDisable(this IObservable source, Component target)
{
return RepeatUntilCore(RepeatInfinite(source), target.OnDisableAsObservable(), (target != null) ? target.gameObject : null);
}
static IObservable RepeatUntilCore(this IEnumerable> sources, IObservable trigger, GameObject lifeTimeChecker)
{
return new UniRx.Operators.RepeatUntilObservable(sources, trigger, lifeTimeChecker);
}
public static IObservable> FrameInterval(this IObservable source)
{
return new UniRx.Operators.FrameIntervalObservable(source);
}
public static IObservable> FrameTimeInterval(this IObservable source, bool ignoreTimeScale = false)
{
return new UniRx.Operators.FrameTimeIntervalObservable(source, ignoreTimeScale);
}
///
/// Buffer elements in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame).
///
public static IObservable> BatchFrame(this IObservable source)
{
// if use default argument, comiler errors ambiguous(Unity's limitation)
return BatchFrame(source, 0, FrameCountType.EndOfFrame);
}
///
/// Buffer elements in during target frame counts.
///
public static IObservable> BatchFrame(this IObservable source, int frameCount, FrameCountType frameCountType)
{
if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount);
return new UniRx.Operators.BatchFrameObservable(source, frameCount, frameCountType);
}
///
/// Wait command in during target frame counts. Default raise same frame of end(frameCount = 0, frameCountType = EndOfFrame).
///
public static IObservable BatchFrame(this IObservable source)
{
return BatchFrame(source, 0, FrameCountType.EndOfFrame);
}
///
/// Wait command in during target frame counts.
///
public static IObservable BatchFrame(this IObservable source, int frameCount, FrameCountType frameCountType)
{
if (frameCount < 0) throw new ArgumentException("frameCount must be >= 0, frameCount:" + frameCount);
return new UniRx.Operators.BatchFrameObservable(source, frameCount, frameCountType);
}
#if UniRxLibrary
static IEnumerable> RepeatInfinite(IObservable source)
{
while (true)
{
yield return source;
}
}
internal static class Stubs
{
public static readonly Action Nop = () => { };
public static readonly Action Throw = ex => { ex.Throw(); };
// Stubs.Ignore can't avoid iOS AOT problem.
public static void Ignore(T t)
{
}
// marker for CatchIgnore and Catch avoid iOS AOT problem.
public static IObservable CatchIgnore(Exception ex)
{
return Observable.Empty();
}
}
#endif
}
}