DelaySubscription.cs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class DelaySubscriptionObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. readonly IScheduler scheduler;
  9. readonly TimeSpan? dueTimeT;
  10. readonly DateTimeOffset? dueTimeD;
  11. public DelaySubscriptionObservable(IObservable<T> source,TimeSpan dueTime, IScheduler scheduler)
  12. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  13. {
  14. this.source = source;
  15. this.scheduler = scheduler;
  16. this.dueTimeT = dueTime;
  17. }
  18. public DelaySubscriptionObservable(IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler)
  19. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  20. {
  21. this.source = source;
  22. this.scheduler = scheduler;
  23. this.dueTimeD = dueTime;
  24. }
  25. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  26. {
  27. if (dueTimeT != null)
  28. {
  29. var d = new MultipleAssignmentDisposable();
  30. var dt = Scheduler.Normalize(dueTimeT.Value);
  31. d.Disposable = scheduler.Schedule(dt, () =>
  32. {
  33. d.Disposable = source.Subscribe(observer);
  34. });
  35. return d;
  36. }
  37. else
  38. {
  39. var d = new MultipleAssignmentDisposable();
  40. d.Disposable = scheduler.Schedule(dueTimeD.Value, () =>
  41. {
  42. d.Disposable = source.Subscribe(observer);
  43. });
  44. return d;
  45. }
  46. }
  47. }
  48. }