using System; namespace UniRx.Operators { internal class CreateObservable : OperatorObservableBase { readonly Func, IDisposable> subscribe; public CreateObservable(Func, IDisposable> subscribe) : base(true) // fail safe { this.subscribe = subscribe; } public CreateObservable(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { observer = new Create(observer, cancel); return subscribe(observer) ?? Disposable.Empty; } class Create : OperatorObserverBase { public Create(IObserver observer, IDisposable cancel) : base(observer, cancel) { } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class CreateObservable : OperatorObservableBase { readonly TState state; readonly Func, IDisposable> subscribe; public CreateObservable(TState state, Func, IDisposable> subscribe) : base(true) // fail safe { this.state = state; this.subscribe = subscribe; } public CreateObservable(TState state, Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.state = state; this.subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { observer = new Create(observer, cancel); return subscribe(state, observer) ?? Disposable.Empty; } class Create : OperatorObserverBase { public Create(IObserver observer, IDisposable cancel) : base(observer, cancel) { } public override void OnNext(T value) { base.observer.OnNext(value); } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class CreateSafeObservable : OperatorObservableBase { readonly Func, IDisposable> subscribe; public CreateSafeObservable(Func, IDisposable> subscribe) : base(true) // fail safe { this.subscribe = subscribe; } public CreateSafeObservable(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { observer = new CreateSafe(observer, cancel); return subscribe(observer) ?? Disposable.Empty; } class CreateSafe : OperatorObserverBase { public CreateSafe(IObserver observer, IDisposable cancel) : base(observer, cancel) { } public override void OnNext(T value) { try { base.observer.OnNext(value); } catch { Dispose(); // safe throw; } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } }