123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- using System;
- using System.Collections.Generic;
- namespace UniRx.Operators
- {
- internal class ObserveOnObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly IScheduler scheduler;
- public ObserveOnObservable(IObservable<T> source, IScheduler scheduler)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.scheduler = scheduler;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- var queueing = scheduler as ISchedulerQueueing;
- if (queueing == null)
- {
- return new ObserveOn(this, observer, cancel).Run();
- }
- else
- {
- return new ObserveOn_(this, queueing, observer, cancel).Run();
- }
- }
- class ObserveOn : OperatorObserverBase<T, T>
- {
- class SchedulableAction : IDisposable
- {
- public Notification<T> data;
- public LinkedListNode<SchedulableAction> node;
- public IDisposable schedule;
- public void Dispose()
- {
- if (schedule != null)
- schedule.Dispose();
- schedule = null;
- if (node.List != null)
- {
- node.List.Remove(node);
- }
- }
- public bool IsScheduled { get { return schedule != null; } }
- }
- readonly ObserveOnObservable<T> parent;
- readonly LinkedList<SchedulableAction> actions = new LinkedList<SchedulableAction>();
- bool isDisposed;
- public ObserveOn(ObserveOnObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- isDisposed = false;
- var sourceDisposable = parent.source.Subscribe(this);
- return StableCompositeDisposable.Create(sourceDisposable, Disposable.Create(() =>
- {
- lock (actions)
- {
- isDisposed = true;
- while (actions.Count > 0)
- {
- // Dispose will both cancel the action (if not already running)
- // and remove it from 'actions'
- actions.First.Value.Dispose();
- }
- }
- }));
- }
- public override void OnNext(T value)
- {
- QueueAction(new Notification<T>.OnNextNotification(value));
- }
- public override void OnError(Exception error)
- {
- QueueAction(new Notification<T>.OnErrorNotification(error));
- }
- public override void OnCompleted()
- {
- QueueAction(new Notification<T>.OnCompletedNotification());
- }
- private void QueueAction(Notification<T> data)
- {
- var action = new SchedulableAction { data = data };
- lock (actions)
- {
- if (isDisposed) return;
- action.node = actions.AddLast(action);
- ProcessNext();
- }
- }
- private void ProcessNext()
- {
- lock (actions)
- {
- if (actions.Count == 0 || isDisposed)
- return;
- var action = actions.First.Value;
- if (action.IsScheduled)
- return;
- action.schedule = parent.scheduler.Schedule(() =>
- {
- try
- {
- switch (action.data.Kind)
- {
- case NotificationKind.OnNext:
- observer.OnNext(action.data.Value);
- break;
- case NotificationKind.OnError:
- observer.OnError(action.data.Exception);
- break;
- case NotificationKind.OnCompleted:
- observer.OnCompleted();
- break;
- }
- }
- finally
- {
- lock (actions)
- {
- action.Dispose();
- }
- if (action.data.Kind == NotificationKind.OnNext)
- ProcessNext();
- else
- Dispose();
- }
- });
- }
- }
- }
- class ObserveOn_ : OperatorObserverBase<T, T>
- {
- readonly ObserveOnObservable<T> parent;
- readonly ISchedulerQueueing scheduler;
- readonly BooleanDisposable isDisposed;
- readonly Action<T> onNext;
- public ObserveOn_(ObserveOnObservable<T> parent, ISchedulerQueueing scheduler, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- this.scheduler = scheduler;
- this.isDisposed = new BooleanDisposable();
- this.onNext = new Action<T>(OnNext_); // cache delegate
- }
- public IDisposable Run()
- {
- var sourceDisposable = parent.source.Subscribe(this);
- return StableCompositeDisposable.Create(sourceDisposable, isDisposed);
- }
- void OnNext_(T value)
- {
- base.observer.OnNext(value);
- }
- void OnError_(Exception error)
- {
- try { observer.OnError(error); } finally { Dispose(); };
- }
- void OnCompleted_(Unit _)
- {
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- public override void OnNext(T value)
- {
- scheduler.ScheduleQueueing(isDisposed, value, onNext);
- }
- public override void OnError(Exception error)
- {
- scheduler.ScheduleQueueing(isDisposed, error, OnError_);
- }
- public override void OnCompleted()
- {
- scheduler.ScheduleQueueing(isDisposed, Unit.Default, OnCompleted_);
- }
- }
- }
- }
|