123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- namespace UniRx.Operators
- {
-
- internal class DoObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action<T> onNext;
- readonly Action<Exception> onError;
- readonly Action onCompleted;
- public DoObservable(IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new Do(this, observer, cancel).Run();
- }
- class Do : OperatorObserverBase<T, T>
- {
- readonly DoObservable<T> parent;
- public Do(DoObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- try
- {
- parent.onNext(value);
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); } finally { Dispose(); };
- return;
- }
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try
- {
- parent.onError(error);
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); } finally { Dispose(); };
- return;
- }
- try { observer.OnError(error); } finally { Dispose(); };
- }
- public override void OnCompleted()
- {
- try
- {
- parent.onCompleted();
- }
- catch (Exception ex)
- {
- base.observer.OnError(ex);
- Dispose();
- return;
- }
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- internal class DoObserverObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly IObserver<T> observer;
- public DoObserverObservable(IObservable<T> source, IObserver<T> observer)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.observer = observer;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new Do(this, observer, cancel).Run();
- }
- class Do : OperatorObserverBase<T, T>
- {
- readonly DoObserverObservable<T> parent;
- public Do(DoObserverObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- try
- {
- parent.observer.OnNext(value);
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return;
- }
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try
- {
- parent.observer.OnError(error);
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return;
- }
- try { observer.OnError(error); }
- finally { Dispose(); }
- }
- public override void OnCompleted()
- {
- try
- {
- parent.observer.OnCompleted();
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return;
- }
- try { observer.OnCompleted(); }
- finally { Dispose(); }
- }
- }
- }
- internal class DoOnErrorObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action<Exception> onError;
- public DoOnErrorObservable(IObservable<T> source, Action<Exception> onError)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onError = onError;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new DoOnError(this, observer, cancel).Run();
- }
- class DoOnError : OperatorObserverBase<T, T>
- {
- readonly DoOnErrorObservable<T> parent;
- public DoOnError(DoOnErrorObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try
- {
- parent.onError(error);
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return;
- }
- try { observer.OnError(error); }
- finally { Dispose(); }
- }
- public override void OnCompleted()
- {
- try { observer.OnCompleted(); }
- finally { Dispose(); }
- }
- }
- }
- internal class DoOnCompletedObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action onCompleted;
- public DoOnCompletedObservable(IObservable<T> source, Action onCompleted)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onCompleted = onCompleted;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new DoOnCompleted(this, observer, cancel).Run();
- }
- class DoOnCompleted : OperatorObserverBase<T, T>
- {
- readonly DoOnCompletedObservable<T> parent;
- public DoOnCompleted(DoOnCompletedObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try { observer.OnError(error); }
- finally { Dispose(); }
- }
- public override void OnCompleted()
- {
- try
- {
- parent.onCompleted();
- }
- catch (Exception ex)
- {
- base.observer.OnError(ex);
- Dispose();
- return;
- }
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- internal class DoOnTerminateObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action onTerminate;
- public DoOnTerminateObservable(IObservable<T> source, Action onTerminate)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onTerminate = onTerminate;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new DoOnTerminate(this, observer, cancel).Run();
- }
- class DoOnTerminate : OperatorObserverBase<T, T>
- {
- readonly DoOnTerminateObservable<T> parent;
- public DoOnTerminate(DoOnTerminateObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try
- {
- parent.onTerminate();
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return;
- }
- try { observer.OnError(error); } finally { Dispose(); };
- }
- public override void OnCompleted()
- {
- try
- {
- parent.onTerminate();
- }
- catch (Exception ex)
- {
- base.observer.OnError(ex);
- Dispose();
- return;
- }
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- internal class DoOnSubscribeObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action onSubscribe;
- public DoOnSubscribeObservable(IObservable<T> source, Action onSubscribe)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onSubscribe = onSubscribe;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new DoOnSubscribe(this, observer, cancel).Run();
- }
- class DoOnSubscribe : OperatorObserverBase<T, T>
- {
- readonly DoOnSubscribeObservable<T> parent;
- public DoOnSubscribe(DoOnSubscribeObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- try
- {
- parent.onSubscribe();
- }
- catch (Exception ex)
- {
- try { observer.OnError(ex); }
- finally { Dispose(); }
- return Disposable.Empty;
- }
- return parent.source.Subscribe(this);
- }
- public override void OnNext(T value)
- {
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- try { observer.OnError(error); } finally { Dispose(); }
- }
- public override void OnCompleted()
- {
- try { observer.OnCompleted(); } finally { Dispose(); }
- }
- }
- }
- internal class DoOnCancelObservable<T> : OperatorObservableBase<T>
- {
- readonly IObservable<T> source;
- readonly Action onCancel;
- public DoOnCancelObservable(IObservable<T> source, Action onCancel)
- : base(source.IsRequiredSubscribeOnCurrentThread())
- {
- this.source = source;
- this.onCancel = onCancel;
- }
- protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
- {
- return new DoOnCancel(this, observer, cancel).Run();
- }
- class DoOnCancel : OperatorObserverBase<T, T>
- {
- readonly DoOnCancelObservable<T> parent;
- bool isCompletedCall = false;
- public DoOnCancel(DoOnCancelObservable<T> parent, IObserver<T> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- this.parent = parent;
- }
- public IDisposable Run()
- {
- return StableCompositeDisposable.Create(parent.source.Subscribe(this), Disposable.Create(() =>
- {
- if (!isCompletedCall)
- {
- parent.onCancel();
- }
- }));
- }
- public override void OnNext(T value)
- {
- base.observer.OnNext(value);
- }
- public override void OnError(Exception error)
- {
- isCompletedCall = true;
- try { observer.OnError(error); } finally { Dispose(); };
- }
- public override void OnCompleted()
- {
- isCompletedCall = true;
- try { observer.OnCompleted(); } finally { Dispose(); };
- }
- }
- }
- }
|