using System; using System.Collections.Generic; namespace UniRx.Operators { internal class SubscribeOnObservable : OperatorObservableBase { readonly IObservable source; readonly IScheduler scheduler; public SubscribeOnObservable(IObservable source, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { var m = new SingleAssignmentDisposable(); var d = new SerialDisposable(); d.Disposable = m; m.Disposable = scheduler.Schedule(() => { d.Disposable = new ScheduledDisposable(scheduler, source.Subscribe(observer)); }); return d; } } }