using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace UniRx.Operators { // Do, DoOnError, DoOnCompleted, DoOnTerminate, DoOnSubscribe, DoOnCancel internal class DoObservable : OperatorObservableBase { readonly IObservable source; readonly Action onNext; readonly Action onError; readonly Action onCompleted; public DoObservable(IObservable source, Action onNext, Action onError, Action onCompleted) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new Do(this, observer, cancel).Run(); } class Do : OperatorObserverBase { readonly DoObservable parent; public Do(DoObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return parent.source.Subscribe(this); } public override void OnNext(T value) { try { parent.onNext(value); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); }; return; } base.observer.OnNext(value); } public override void OnError(Exception error) { try { parent.onError(error); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); }; return; } try { observer.OnError(error); } finally { Dispose(); }; } public override void OnCompleted() { try { parent.onCompleted(); } catch (Exception ex) { base.observer.OnError(ex); Dispose(); return; } try { observer.OnCompleted(); } finally { Dispose(); }; } } } internal class DoObserverObservable : OperatorObservableBase { readonly IObservable source; readonly IObserver observer; public DoObserverObservable(IObservable source, IObserver observer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.observer = observer; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new Do(this, observer, cancel).Run(); } class Do : OperatorObserverBase { readonly DoObserverObservable parent; public Do(DoObserverObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return parent.source.Subscribe(this); } public override void OnNext(T value) { try { parent.observer.OnNext(value); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } base.observer.OnNext(value); } public override void OnError(Exception error) { try { parent.observer.OnError(error); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { parent.observer.OnCompleted(); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class DoOnErrorObservable : OperatorObservableBase { readonly IObservable source; readonly Action onError; public DoOnErrorObservable(IObservable source, Action onError) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onError = onError; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new DoOnError(this, observer, cancel).Run(); } class DoOnError : OperatorObserverBase { readonly DoOnErrorObservable parent; public DoOnError(DoOnErrorObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return parent.source.Subscribe(this); } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { parent.onError(error); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class DoOnCompletedObservable : OperatorObservableBase { readonly IObservable source; readonly Action onCompleted; public DoOnCompletedObservable(IObservable source, Action onCompleted) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onCompleted = onCompleted; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new DoOnCompleted(this, observer, cancel).Run(); } class DoOnCompleted : OperatorObserverBase { readonly DoOnCompletedObservable parent; public DoOnCompleted(DoOnCompletedObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return parent.source.Subscribe(this); } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { parent.onCompleted(); } catch (Exception ex) { base.observer.OnError(ex); Dispose(); return; } try { observer.OnCompleted(); } finally { Dispose(); }; } } } internal class DoOnTerminateObservable : OperatorObservableBase { readonly IObservable source; readonly Action onTerminate; public DoOnTerminateObservable(IObservable source, Action onTerminate) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onTerminate = onTerminate; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new DoOnTerminate(this, observer, cancel).Run(); } class DoOnTerminate : OperatorObserverBase { readonly DoOnTerminateObservable parent; public DoOnTerminate(DoOnTerminateObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return parent.source.Subscribe(this); } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { parent.onTerminate(); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return; } try { observer.OnError(error); } finally { Dispose(); }; } public override void OnCompleted() { try { parent.onTerminate(); } catch (Exception ex) { base.observer.OnError(ex); Dispose(); return; } try { observer.OnCompleted(); } finally { Dispose(); }; } } } internal class DoOnSubscribeObservable : OperatorObservableBase { readonly IObservable source; readonly Action onSubscribe; public DoOnSubscribeObservable(IObservable source, Action onSubscribe) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onSubscribe = onSubscribe; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new DoOnSubscribe(this, observer, cancel).Run(); } class DoOnSubscribe : OperatorObserverBase { readonly DoOnSubscribeObservable parent; public DoOnSubscribe(DoOnSubscribeObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { try { parent.onSubscribe(); } catch (Exception ex) { try { observer.OnError(ex); } finally { Dispose(); } return Disposable.Empty; } return parent.source.Subscribe(this); } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class DoOnCancelObservable : OperatorObservableBase { readonly IObservable source; readonly Action onCancel; public DoOnCancelObservable(IObservable source, Action onCancel) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.onCancel = onCancel; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return new DoOnCancel(this, observer, cancel).Run(); } class DoOnCancel : OperatorObserverBase { readonly DoOnCancelObservable parent; bool isCompletedCall = false; public DoOnCancel(DoOnCancelObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { return StableCompositeDisposable.Create(parent.source.Subscribe(this), Disposable.Create(() => { if (!isCompletedCall) { parent.onCancel(); } })); } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { isCompletedCall = true; try { observer.OnError(error); } finally { Dispose(); }; } public override void OnCompleted() { isCompletedCall = true; try { observer.OnCompleted(); } finally { Dispose(); }; } } } }