using System; using System.Threading; using UniRx.InternalUtil; namespace UniRx { public static class Observer { internal static IObserver CreateSubscribeObserver(Action onNext, Action onError, Action onCompleted) { // need compare for avoid iOS AOT if (onNext == Stubs.Ignore) { return new Subscribe_(onError, onCompleted); } else { return new Subscribe(onNext, onError, onCompleted); } } internal static IObserver CreateSubscribeWithStateObserver(TState state, Action onNext, Action onError, Action onCompleted) { return new Subscribe(state, onNext, onError, onCompleted); } internal static IObserver CreateSubscribeWithState2Observer(TState1 state1, TState2 state2, Action onNext, Action onError, Action onCompleted) { return new Subscribe(state1, state2, onNext, onError, onCompleted); } internal static IObserver CreateSubscribeWithState3Observer(TState1 state1, TState2 state2, TState3 state3, Action onNext, Action onError, Action onCompleted) { return new Subscribe(state1, state2, state3, onNext, onError, onCompleted); } public static IObserver Create(Action onNext) { return Create(onNext, UniRx.Stubs.Throw, UniRx.Stubs.Nop); } public static IObserver Create(Action onNext, Action onError) { return Create(onNext, onError, UniRx.Stubs.Nop); } public static IObserver Create(Action onNext, Action onCompleted) { return Create(onNext, UniRx.Stubs.Throw, onCompleted); } public static IObserver Create(Action onNext, Action onError, Action onCompleted) { // need compare for avoid iOS AOT if (onNext == Stubs.Ignore) { return new EmptyOnNextAnonymousObserver(onError, onCompleted); } else { return new AnonymousObserver(onNext, onError, onCompleted); } } public static IObserver CreateAutoDetachObserver(IObserver observer, IDisposable disposable) { return new AutoDetachObserver(observer, disposable); } class AnonymousObserver : IObserver { readonly Action onNext; readonly Action onError; readonly Action onCompleted; int isStopped = 0; public AnonymousObserver(Action onNext, Action onError, Action onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } class EmptyOnNextAnonymousObserver : IObserver { readonly Action onError; readonly Action onCompleted; int isStopped = 0; public EmptyOnNextAnonymousObserver(Action onError, Action onCompleted) { this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } // same as AnonymousObserver... class Subscribe : IObserver { readonly Action onNext; readonly Action onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(Action onNext, Action onError, Action onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } // same as EmptyOnNextAnonymousObserver... class Subscribe_ : IObserver { readonly Action onError; readonly Action onCompleted; int isStopped = 0; public Subscribe_(Action onError, Action onCompleted) { this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } // with state class Subscribe : IObserver { readonly TState state; readonly Action onNext; readonly Action onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(TState state, Action onNext, Action onError, Action onCompleted) { this.state = state; this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value, state); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error, state); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(state); } } } class Subscribe : IObserver { readonly TState1 state1; readonly TState2 state2; readonly Action onNext; readonly Action onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(TState1 state1, TState2 state2, Action onNext, Action onError, Action onCompleted) { this.state1 = state1; this.state2 = state2; this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value, state1, state2); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error, state1, state2); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(state1, state2); } } } class Subscribe : IObserver { readonly TState1 state1; readonly TState2 state2; readonly TState3 state3; readonly Action onNext; readonly Action onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(TState1 state1, TState2 state2, TState3 state3, Action onNext, Action onError, Action onCompleted) { this.state1 = state1; this.state2 = state2; this.state3 = state3; this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value, state1, state2, state3); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error, state1, state2, state3); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(state1, state2, state3); } } } class AutoDetachObserver : UniRx.Operators.OperatorObserverBase { public AutoDetachObserver(IObserver observer, IDisposable cancel) : base(observer, cancel) { } public override void OnNext(T value) { try { base.observer.OnNext(value); } catch { Dispose(); throw; } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } public static partial class ObserverExtensions { public static IObserver Synchronize(this IObserver observer) { return new UniRx.Operators.SynchronizedObserver(observer, new object()); } public static IObserver Synchronize(this IObserver observer, object gate) { return new UniRx.Operators.SynchronizedObserver(observer, gate); } } public static partial class ObservableExtensions { public static IDisposable Subscribe(this IObservable source) { return source.Subscribe(UniRx.InternalUtil.ThrowObserver.Instance); } public static IDisposable Subscribe(this IObservable source, Action onNext) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop)); } public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop)); } public static IDisposable Subscribe(this IObservable source, Action onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted)); } public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted)); } public static IDisposable SubscribeWithState(this IObservable source, TState state, Action onNext) { return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs.Throw, Stubs.Ignore)); } public static IDisposable SubscribeWithState(this IObservable source, TState state, Action onNext, Action onError) { return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs.Ignore)); } public static IDisposable SubscribeWithState(this IObservable source, TState state, Action onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs.Throw, onCompleted)); } public static IDisposable SubscribeWithState(this IObservable source, TState state, Action onNext, Action onError, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted)); } public static IDisposable SubscribeWithState2(this IObservable source, TState1 state1, TState2 state2, Action onNext) { return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs.Throw, Stubs.Ignore)); } public static IDisposable SubscribeWithState2(this IObservable source, TState1 state1, TState2 state2, Action onNext, Action onError) { return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, Stubs.Ignore)); } public static IDisposable SubscribeWithState2(this IObservable source, TState1 state1, TState2 state2, Action onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs.Throw, onCompleted)); } public static IDisposable SubscribeWithState2(this IObservable source, TState1 state1, TState2 state2, Action onNext, Action onError, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted)); } public static IDisposable SubscribeWithState3(this IObservable source, TState1 state1, TState2 state2, TState3 state3, Action onNext) { return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs.Throw, Stubs.Ignore)); } public static IDisposable SubscribeWithState3(this IObservable source, TState1 state1, TState2 state2, TState3 state3, Action onNext, Action onError) { return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, Stubs.Ignore)); } public static IDisposable SubscribeWithState3(this IObservable source, TState1 state1, TState2 state2, TState3 state3, Action onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs.Throw, onCompleted)); } public static IDisposable SubscribeWithState3(this IObservable source, TState1 state1, TState2 state2, TState3 state3, Action onNext, Action onError, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted)); } } internal static class Stubs { public static readonly Action Nop = () => { }; public static readonly Action Throw = ex => { ex.Throw(); }; // marker for CatchIgnore and Catch avoid iOS AOT problem. public static IObservable CatchIgnore(Exception ex) { return Observable.Empty(); } } internal static class Stubs { public static readonly Action Ignore = (T t) => { }; public static readonly Func Identity = (T t) => t; public static readonly Action Throw = (ex, _) => { ex.Throw(); }; } internal static class Stubs { public static readonly Action Ignore = (x, y) => { }; public static readonly Action Throw = (ex, _, __) => { ex.Throw(); }; } internal static class Stubs { public static readonly Action Ignore = (x, y, z) => { }; public static readonly Action Throw = (ex, _, __, ___) => { ex.Throw(); }; } }