using System; using System.Collections.Generic; namespace UniRx.Operators { internal class DelaySubscriptionObservable : OperatorObservableBase { readonly IObservable source; readonly IScheduler scheduler; readonly TimeSpan? dueTimeT; readonly DateTimeOffset? dueTimeD; public DelaySubscriptionObservable(IObservable source,TimeSpan dueTime, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.scheduler = scheduler; this.dueTimeT = dueTime; } public DelaySubscriptionObservable(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.scheduler = scheduler; this.dueTimeD = dueTime; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { if (dueTimeT != null) { var d = new MultipleAssignmentDisposable(); var dt = Scheduler.Normalize(dueTimeT.Value); d.Disposable = scheduler.Schedule(dt, () => { d.Disposable = source.Subscribe(observer); }); return d; } else { var d = new MultipleAssignmentDisposable(); d.Disposable = scheduler.Schedule(dueTimeD.Value, () => { d.Disposable = source.Subscribe(observer); }); return d; } } } }