123456789101112131415161718192021222324252627282930313233343536373839404142 |
- using System;
- namespace UniRx.Operators
- {
- // implements note : all field must be readonly.
- public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
- {
- readonly bool isRequiredSubscribeOnCurrentThread;
- public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
- {
- this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
- }
- public bool IsRequiredSubscribeOnCurrentThread()
- {
- return isRequiredSubscribeOnCurrentThread;
- }
- public IDisposable Subscribe(IObserver<T> observer)
- {
- var subscription = new SingleAssignmentDisposable();
- // note:
- // does not make the safe observer, it breaks exception durability.
- // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);
- if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
- {
- Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
- }
- else
- {
- subscription.Disposable = SubscribeCore(observer, subscription);
- }
- return subscription;
- }
- protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
- }
- }
|