using System; using System.Collections.Generic; namespace UniRx.Operators { internal class ObserveOnObservable : OperatorObservableBase { readonly IObservable source; readonly IScheduler scheduler; public ObserveOnObservable(IObservable source, IScheduler scheduler) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { var queueing = scheduler as ISchedulerQueueing; if (queueing == null) { return new ObserveOn(this, observer, cancel).Run(); } else { return new ObserveOn_(this, queueing, observer, cancel).Run(); } } class ObserveOn : OperatorObserverBase { class SchedulableAction : IDisposable { public Notification data; public LinkedListNode node; public IDisposable schedule; public void Dispose() { if (schedule != null) schedule.Dispose(); schedule = null; if (node.List != null) { node.List.Remove(node); } } public bool IsScheduled { get { return schedule != null; } } } readonly ObserveOnObservable parent; readonly LinkedList actions = new LinkedList(); bool isDisposed; public ObserveOn(ObserveOnObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { isDisposed = false; var sourceDisposable = parent.source.Subscribe(this); return StableCompositeDisposable.Create(sourceDisposable, Disposable.Create(() => { lock (actions) { isDisposed = true; while (actions.Count > 0) { // Dispose will both cancel the action (if not already running) // and remove it from 'actions' actions.First.Value.Dispose(); } } })); } public override void OnNext(T value) { QueueAction(new Notification.OnNextNotification(value)); } public override void OnError(Exception error) { QueueAction(new Notification.OnErrorNotification(error)); } public override void OnCompleted() { QueueAction(new Notification.OnCompletedNotification()); } private void QueueAction(Notification data) { var action = new SchedulableAction { data = data }; lock (actions) { if (isDisposed) return; action.node = actions.AddLast(action); ProcessNext(); } } private void ProcessNext() { lock (actions) { if (actions.Count == 0 || isDisposed) return; var action = actions.First.Value; if (action.IsScheduled) return; action.schedule = parent.scheduler.Schedule(() => { try { switch (action.data.Kind) { case NotificationKind.OnNext: observer.OnNext(action.data.Value); break; case NotificationKind.OnError: observer.OnError(action.data.Exception); break; case NotificationKind.OnCompleted: observer.OnCompleted(); break; } } finally { lock (actions) { action.Dispose(); } if (action.data.Kind == NotificationKind.OnNext) ProcessNext(); else Dispose(); } }); } } } class ObserveOn_ : OperatorObserverBase { readonly ObserveOnObservable parent; readonly ISchedulerQueueing scheduler; readonly BooleanDisposable isDisposed; readonly Action onNext; public ObserveOn_(ObserveOnObservable parent, ISchedulerQueueing scheduler, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; this.scheduler = scheduler; this.isDisposed = new BooleanDisposable(); this.onNext = new Action(OnNext_); // cache delegate } public IDisposable Run() { var sourceDisposable = parent.source.Subscribe(this); return StableCompositeDisposable.Create(sourceDisposable, isDisposed); } void OnNext_(T value) { base.observer.OnNext(value); } void OnError_(Exception error) { try { observer.OnError(error); } finally { Dispose(); }; } void OnCompleted_(Unit _) { try { observer.OnCompleted(); } finally { Dispose(); }; } public override void OnNext(T value) { scheduler.ScheduleQueueing(isDisposed, value, onNext); } public override void OnError(Exception error) { scheduler.ScheduleQueueing(isDisposed, error, OnError_); } public override void OnCompleted() { scheduler.ScheduleQueueing(isDisposed, Unit.Default, OnCompleted_); } } } }