SubscribeOn.cs 971 B

1234567891011121314151617181920212223242526272829303132
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class SubscribeOnObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. readonly IScheduler scheduler;
  9. public SubscribeOnObservable(IObservable<T> source, IScheduler scheduler)
  10. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.scheduler = scheduler;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  16. {
  17. var m = new SingleAssignmentDisposable();
  18. var d = new SerialDisposable();
  19. d.Disposable = m;
  20. m.Disposable = scheduler.Schedule(() =>
  21. {
  22. d.Disposable = new ScheduledDisposable(scheduler, source.Subscribe(observer));
  23. });
  24. return d;
  25. }
  26. }
  27. }