using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; using UnityEngine; namespace UniRx { #if UniRxLibrary public static partial class SchedulerUnity { #else public static partial class Scheduler { public static void SetDefaultForUnity() { Scheduler.DefaultSchedulers.ConstantTimeOperations = Scheduler.Immediate; Scheduler.DefaultSchedulers.TailRecursion = Scheduler.Immediate; Scheduler.DefaultSchedulers.Iteration = Scheduler.CurrentThread; Scheduler.DefaultSchedulers.TimeBasedOperations = MainThread; Scheduler.DefaultSchedulers.AsyncConversions = Scheduler.ThreadPool; } #endif static IScheduler mainThread; /// /// Unity native MainThread Queue Scheduler. Run on mainthread and delayed on coroutine update loop, elapsed time is calculated based on Time.time. /// public static IScheduler MainThread { get { return mainThread ?? (mainThread = new MainThreadScheduler()); } } static IScheduler mainThreadIgnoreTimeScale; /// /// Another MainThread scheduler, delay elapsed time is calculated based on Time.unscaledDeltaTime. /// public static IScheduler MainThreadIgnoreTimeScale { get { return mainThreadIgnoreTimeScale ?? (mainThreadIgnoreTimeScale = new IgnoreTimeScaleMainThreadScheduler()); } } static IScheduler mainThreadFixedUpdate; /// /// Run on fixed update mainthread, delay elapsed time is calculated based on Time.fixedTime. /// public static IScheduler MainThreadFixedUpdate { get { return mainThreadFixedUpdate ?? (mainThreadFixedUpdate = new FixedUpdateMainThreadScheduler()); } } static IScheduler mainThreadEndOfFrame; /// /// Run on end of frame mainthread, delay elapsed time is calculated based on Time.deltaTime. /// public static IScheduler MainThreadEndOfFrame { get { return mainThreadEndOfFrame ?? (mainThreadEndOfFrame = new EndOfFrameMainThreadScheduler()); } } class MainThreadScheduler : IScheduler, ISchedulerPeriodic, ISchedulerQueueing { readonly Action scheduleAction; public MainThreadScheduler() { MainThreadDispatcher.Initialize(); scheduleAction = new Action(Schedule); } // delay action is run in StartCoroutine // Okay to action run synchronous and guaranteed run on MainThread IEnumerator DelayAction(TimeSpan dueTime, Action action, ICancelable cancellation) { // zero == every frame if (dueTime == TimeSpan.Zero) { yield return null; // not immediately, run next frame } else { yield return new WaitForSeconds((float)dueTime.TotalSeconds); } if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } IEnumerator PeriodicAction(TimeSpan period, Action action, ICancelable cancellation) { // zero == every frame if (period == TimeSpan.Zero) { while (true) { yield return null; // not immediately, run next frame if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } } else { var seconds = (float)(period.TotalMilliseconds / 1000.0); var yieldInstruction = new WaitForSeconds(seconds); // cache single instruction object while (true) { yield return yieldInstruction; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } } } public DateTimeOffset Now { get { return Scheduler.Now; } } void Schedule(object state) { var t = (Tuple)state; if (!t.Item1.IsDisposed) { t.Item2(); } } public IDisposable Schedule(Action action) { var d = new BooleanDisposable(); MainThreadDispatcher.Post(scheduleAction, Tuple.Create(d, action)); return d; } public IDisposable Schedule(DateTimeOffset dueTime, Action action) { return Schedule(dueTime - Now, action); } public IDisposable Schedule(TimeSpan dueTime, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(dueTime); MainThreadDispatcher.SendStartCoroutine(DelayAction(time, action, d)); return d; } public IDisposable SchedulePeriodic(TimeSpan period, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(period); MainThreadDispatcher.SendStartCoroutine(PeriodicAction(time, action, d)); return d; } void ScheduleQueueing(object state) { var t = (Tuple>)state; if (!t.Item1.IsDisposed) { t.Item3(t.Item2); } } public void ScheduleQueueing(ICancelable cancel, T state, Action action) { MainThreadDispatcher.Post(QueuedAction.Instance, Tuple.Create(cancel, state, action)); } static class QueuedAction { public static readonly Action Instance = new Action(Invoke); public static void Invoke(object state) { var t = (Tuple>)state; if (!t.Item1.IsDisposed) { t.Item3(t.Item2); } } } } class IgnoreTimeScaleMainThreadScheduler : IScheduler, ISchedulerPeriodic, ISchedulerQueueing { readonly Action scheduleAction; public IgnoreTimeScaleMainThreadScheduler() { MainThreadDispatcher.Initialize(); scheduleAction = new Action(Schedule); } IEnumerator DelayAction(TimeSpan dueTime, Action action, ICancelable cancellation) { if (dueTime == TimeSpan.Zero) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } else { var elapsed = 0f; var dt = (float)dueTime.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; elapsed += Time.unscaledDeltaTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); break; } } } } IEnumerator PeriodicAction(TimeSpan period, Action action, ICancelable cancellation) { // zero == every frame if (period == TimeSpan.Zero) { while (true) { yield return null; // not immediately, run next frame if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } } else { var elapsed = 0f; var dt = (float)period.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; elapsed += Time.unscaledDeltaTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); elapsed = 0; } } } } public DateTimeOffset Now { get { return Scheduler.Now; } } void Schedule(object state) { var t = (Tuple)state; if (!t.Item1.IsDisposed) { t.Item2(); } } public IDisposable Schedule(Action action) { var d = new BooleanDisposable(); MainThreadDispatcher.Post(scheduleAction, Tuple.Create(d, action)); return d; } public IDisposable Schedule(DateTimeOffset dueTime, Action action) { return Schedule(dueTime - Now, action); } public IDisposable Schedule(TimeSpan dueTime, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(dueTime); MainThreadDispatcher.SendStartCoroutine(DelayAction(time, action, d)); return d; } public IDisposable SchedulePeriodic(TimeSpan period, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(period); MainThreadDispatcher.SendStartCoroutine(PeriodicAction(time, action, d)); return d; } public void ScheduleQueueing(ICancelable cancel, T state, Action action) { MainThreadDispatcher.Post(QueuedAction.Instance, Tuple.Create(cancel, state, action)); } static class QueuedAction { public static readonly Action Instance = new Action(Invoke); public static void Invoke(object state) { var t = (Tuple>)state; if (!t.Item1.IsDisposed) { t.Item3(t.Item2); } } } } class FixedUpdateMainThreadScheduler : IScheduler, ISchedulerPeriodic, ISchedulerQueueing { public FixedUpdateMainThreadScheduler() { MainThreadDispatcher.Initialize(); } IEnumerator ImmediateAction(T state, Action action, ICancelable cancellation) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action, state); } IEnumerator DelayAction(TimeSpan dueTime, Action action, ICancelable cancellation) { if (dueTime == TimeSpan.Zero) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } else { var startTime = Time.fixedTime; var dt = (float)dueTime.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; var elapsed = Time.fixedTime - startTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); break; } } } } IEnumerator PeriodicAction(TimeSpan period, Action action, ICancelable cancellation) { // zero == every frame if (period == TimeSpan.Zero) { while (true) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } } else { var startTime = Time.fixedTime; var dt = (float)period.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; var ft = Time.fixedTime; var elapsed = ft - startTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); startTime = ft; } } } } public DateTimeOffset Now { get { return Scheduler.Now; } } public IDisposable Schedule(Action action) { return Schedule(TimeSpan.Zero, action); } public IDisposable Schedule(DateTimeOffset dueTime, Action action) { return Schedule(dueTime - Now, action); } public IDisposable Schedule(TimeSpan dueTime, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(dueTime); MainThreadDispatcher.StartFixedUpdateMicroCoroutine(DelayAction(time, action, d)); return d; } public IDisposable SchedulePeriodic(TimeSpan period, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(period); MainThreadDispatcher.StartFixedUpdateMicroCoroutine(PeriodicAction(time, action, d)); return d; } public void ScheduleQueueing(ICancelable cancel, T state, Action action) { MainThreadDispatcher.StartFixedUpdateMicroCoroutine(ImmediateAction(state, action, cancel)); } } class EndOfFrameMainThreadScheduler : IScheduler, ISchedulerPeriodic, ISchedulerQueueing { public EndOfFrameMainThreadScheduler() { MainThreadDispatcher.Initialize(); } IEnumerator ImmediateAction(T state, Action action, ICancelable cancellation) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action, state); } IEnumerator DelayAction(TimeSpan dueTime, Action action, ICancelable cancellation) { if (dueTime == TimeSpan.Zero) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } else { var elapsed = 0f; var dt = (float)dueTime.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; elapsed += Time.deltaTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); break; } } } } IEnumerator PeriodicAction(TimeSpan period, Action action, ICancelable cancellation) { // zero == every frame if (period == TimeSpan.Zero) { while (true) { yield return null; if (cancellation.IsDisposed) yield break; MainThreadDispatcher.UnsafeSend(action); } } else { var elapsed = 0f; var dt = (float)period.TotalSeconds; while (true) { yield return null; if (cancellation.IsDisposed) break; elapsed += Time.deltaTime; if (elapsed >= dt) { MainThreadDispatcher.UnsafeSend(action); elapsed = 0; } } } } public DateTimeOffset Now { get { return Scheduler.Now; } } public IDisposable Schedule(Action action) { return Schedule(TimeSpan.Zero, action); } public IDisposable Schedule(DateTimeOffset dueTime, Action action) { return Schedule(dueTime - Now, action); } public IDisposable Schedule(TimeSpan dueTime, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(dueTime); MainThreadDispatcher.StartEndOfFrameMicroCoroutine(DelayAction(time, action, d)); return d; } public IDisposable SchedulePeriodic(TimeSpan period, Action action) { var d = new BooleanDisposable(); var time = Scheduler.Normalize(period); MainThreadDispatcher.StartEndOfFrameMicroCoroutine(PeriodicAction(time, action, d)); return d; } public void ScheduleQueueing(ICancelable cancel, T state, Action action) { MainThreadDispatcher.StartEndOfFrameMicroCoroutine(ImmediateAction(state, action, cancel)); } } } }