123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- using System;
- using System.Threading;
- using UniRx.InternalUtil;
- namespace UniRx
- {
- public static class Observer
- {
- internal static IObserver<T> CreateSubscribeObserver<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- // need compare for avoid iOS AOT
- if (onNext == Stubs<T>.Ignore)
- {
- return new Subscribe_<T>(onError, onCompleted);
- }
- else
- {
- return new Subscribe<T>(onNext, onError, onCompleted);
- }
- }
- internal static IObserver<T> CreateSubscribeWithStateObserver<T, TState>(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
- {
- return new Subscribe<T, TState>(state, onNext, onError, onCompleted);
- }
- internal static IObserver<T> CreateSubscribeWithState2Observer<T, TState1, TState2>(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
- {
- return new Subscribe<T, TState1, TState2>(state1, state2, onNext, onError, onCompleted);
- }
- internal static IObserver<T> CreateSubscribeWithState3Observer<T, TState1, TState2, TState3>(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
- {
- return new Subscribe<T, TState1, TState2, TState3>(state1, state2, state3, onNext, onError, onCompleted);
- }
- public static IObserver<T> Create<T>(Action<T> onNext)
- {
- return Create<T>(onNext, UniRx.Stubs.Throw, UniRx.Stubs.Nop);
- }
- public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
- {
- return Create<T>(onNext, onError, UniRx.Stubs.Nop);
- }
- public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
- {
- return Create<T>(onNext, UniRx.Stubs.Throw, onCompleted);
- }
- public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- // need compare for avoid iOS AOT
- if (onNext == Stubs<T>.Ignore)
- {
- return new EmptyOnNextAnonymousObserver<T>(onError, onCompleted);
- }
- else
- {
- return new AnonymousObserver<T>(onNext, onError, onCompleted);
- }
- }
- public static IObserver<T> CreateAutoDetachObserver<T>(IObserver<T> observer, IDisposable disposable)
- {
- return new AutoDetachObserver<T>(observer, disposable);
- }
- class AnonymousObserver<T> : IObserver<T>
- {
- readonly Action<T> onNext;
- readonly Action<Exception> onError;
- readonly Action onCompleted;
- int isStopped = 0;
- public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- {
- onNext(value);
- }
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted();
- }
- }
- }
- class EmptyOnNextAnonymousObserver<T> : IObserver<T>
- {
- readonly Action<Exception> onError;
- readonly Action onCompleted;
- int isStopped = 0;
- public EmptyOnNextAnonymousObserver(Action<Exception> onError, Action onCompleted)
- {
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted();
- }
- }
- }
- // same as AnonymousObserver...
- class Subscribe<T> : IObserver<T>
- {
- readonly Action<T> onNext;
- readonly Action<Exception> onError;
- readonly Action onCompleted;
- int isStopped = 0;
- public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- {
- onNext(value);
- }
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted();
- }
- }
- }
- // same as EmptyOnNextAnonymousObserver...
- class Subscribe_<T> : IObserver<T>
- {
- readonly Action<Exception> onError;
- readonly Action onCompleted;
- int isStopped = 0;
- public Subscribe_(Action<Exception> onError, Action onCompleted)
- {
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted();
- }
- }
- }
- // with state
- class Subscribe<T, TState> : IObserver<T>
- {
- readonly TState state;
- readonly Action<T, TState> onNext;
- readonly Action<Exception, TState> onError;
- readonly Action<TState> onCompleted;
- int isStopped = 0;
- public Subscribe(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
- {
- this.state = state;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- {
- onNext(value, state);
- }
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error, state);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted(state);
- }
- }
- }
- class Subscribe<T, TState1, TState2> : IObserver<T>
- {
- readonly TState1 state1;
- readonly TState2 state2;
- readonly Action<T, TState1, TState2> onNext;
- readonly Action<Exception, TState1, TState2> onError;
- readonly Action<TState1, TState2> onCompleted;
- int isStopped = 0;
- public Subscribe(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
- {
- this.state1 = state1;
- this.state2 = state2;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- {
- onNext(value, state1, state2);
- }
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error, state1, state2);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted(state1, state2);
- }
- }
- }
- class Subscribe<T, TState1, TState2, TState3> : IObserver<T>
- {
- readonly TState1 state1;
- readonly TState2 state2;
- readonly TState3 state3;
- readonly Action<T, TState1, TState2, TState3> onNext;
- readonly Action<Exception, TState1, TState2, TState3> onError;
- readonly Action<TState1, TState2, TState3> onCompleted;
- int isStopped = 0;
- public Subscribe(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
- {
- this.state1 = state1;
- this.state2 = state2;
- this.state3 = state3;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- {
- onNext(value, state1, state2, state3);
- }
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onError(error, state1, state2, state3);
- }
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- {
- onCompleted(state1, state2, state3);
- }
- }
- }
- class AutoDetachObserver<T> : UniRx.Operators.OperatorObserverBase<T, T>
- {
- public AutoDetachObserver(IObserver<T> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- }
- public override void OnNext(T value)
- {
- try
- {
- base.observer.OnNext(value);
- }
- catch
- {
- Dispose();
- throw;
- }
- }
- public override void OnError(Exception error)
- {
- try { observer.OnError(error); }
- finally { Dispose(); }
- }
- public override void OnCompleted()
- {
- try { observer.OnCompleted(); }
- finally { Dispose(); }
- }
- }
- }
- public static partial class ObserverExtensions
- {
- public static IObserver<T> Synchronize<T>(this IObserver<T> observer)
- {
- return new UniRx.Operators.SynchronizedObserver<T>(observer, new object());
- }
- public static IObserver<T> Synchronize<T>(this IObserver<T> observer, object gate)
- {
- return new UniRx.Operators.SynchronizedObserver<T>(observer, gate);
- }
- }
- public static partial class ObservableExtensions
- {
- public static IDisposable Subscribe<T>(this IObservable<T> source)
- {
- return source.Subscribe(UniRx.InternalUtil.ThrowObserver<T>.Instance);
- }
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
- {
- return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
- }
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
- {
- return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop));
- }
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));
- }
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted));
- }
- public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext)
- {
- return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, Stubs<TState>.Ignore));
- }
- public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError)
- {
- return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs<TState>.Ignore));
- }
- public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<TState> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, onCompleted));
- }
- public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted));
- }
- public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, Stubs<TState1, TState2>.Ignore));
- }
- public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, Stubs<TState1, TState2>.Ignore));
- }
- public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<TState1, TState2> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, onCompleted));
- }
- public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted));
- }
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, Stubs<TState1, TState2, TState3>.Ignore));
- }
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, Stubs<TState1, TState2, TState3>.Ignore));
- }
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<TState1, TState2, TState3> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, onCompleted));
- }
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
- {
- return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted));
- }
- }
- internal static class Stubs
- {
- public static readonly Action Nop = () => { };
- public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
- // marker for CatchIgnore and Catch avoid iOS AOT problem.
- public static IObservable<TSource> CatchIgnore<TSource>(Exception ex)
- {
- return Observable.Empty<TSource>();
- }
- }
- internal static class Stubs<T>
- {
- public static readonly Action<T> Ignore = (T t) => { };
- public static readonly Func<T, T> Identity = (T t) => t;
- public static readonly Action<Exception, T> Throw = (ex, _) => { ex.Throw(); };
- }
- internal static class Stubs<T1, T2>
- {
- public static readonly Action<T1, T2> Ignore = (x, y) => { };
- public static readonly Action<Exception, T1, T2> Throw = (ex, _, __) => { ex.Throw(); };
- }
- internal static class Stubs<T1, T2, T3>
- {
- public static readonly Action<T1, T2, T3> Ignore = (x, y, z) => { };
- public static readonly Action<Exception, T1, T2, T3> Throw = (ex, _, __, ___) => { ex.Throw(); };
- }
- }
|