using System; namespace UniRx.Operators { // implements note : all field must be readonly. public abstract class OperatorObservableBase : IObservable, IOptimizedObservable { readonly bool isRequiredSubscribeOnCurrentThread; public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread) { this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread; } public bool IsRequiredSubscribeOnCurrentThread() { return isRequiredSubscribeOnCurrentThread; } public IDisposable Subscribe(IObserver observer) { var subscription = new SingleAssignmentDisposable(); // note: // does not make the safe observer, it breaks exception durability. // var safeObserver = Observer.CreateAutoDetachObserver(observer, subscription); if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired) { Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription)); } else { subscription.Disposable = SubscribeCore(observer, subscription); } return subscription; } protected abstract IDisposable SubscribeCore(IObserver observer, IDisposable cancel); } }