OperatorObservableBase.cs 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. // implements note : all field must be readonly.
  5. public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
  6. {
  7. readonly bool isRequiredSubscribeOnCurrentThread;
  8. public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
  9. {
  10. this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
  11. }
  12. public bool IsRequiredSubscribeOnCurrentThread()
  13. {
  14. return isRequiredSubscribeOnCurrentThread;
  15. }
  16. public IDisposable Subscribe(IObserver<T> observer)
  17. {
  18. var subscription = new SingleAssignmentDisposable();
  19. // note:
  20. // does not make the safe observer, it breaks exception durability.
  21. // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);
  22. if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
  23. {
  24. Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
  25. }
  26. else
  27. {
  28. subscription.Disposable = SubscribeCore(observer, subscription);
  29. }
  30. return subscription;
  31. }
  32. protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
  33. }
  34. }