using System; using System.Collections.Generic; using UniRx.Operators; namespace UniRx { public static partial class Observable { /// /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator. /// public static IObservable Create(Func, IDisposable> subscribe) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable(subscribe); } /// /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable). /// public static IObservable Create(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable(subscribe, isRequiredSubscribeOnCurrentThread); } /// /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator. /// public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable(state, subscribe); } /// /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable). /// public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable(state, subscribe, isRequiredSubscribeOnCurrentThread); } /// /// Create anonymous observable. Safe means auto detach when error raised in onNext pipeline. This is recommended for make generator (ColdObservable). /// public static IObservable CreateSafe(Func, IDisposable> subscribe) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateSafeObservable(subscribe); } /// /// Create anonymous observable. Safe means auto detach when error raised in onNext pipeline. This is recommended for make generator (ColdObservable). /// public static IObservable CreateSafe(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateSafeObservable(subscribe, isRequiredSubscribeOnCurrentThread); } /// /// Empty Observable. Returns only OnCompleted. /// public static IObservable Empty() { return Empty(Scheduler.DefaultSchedulers.ConstantTimeOperations); } /// /// Empty Observable. Returns only OnCompleted on specified scheduler. /// public static IObservable Empty(IScheduler scheduler) { if (scheduler == Scheduler.Immediate) { return ImmutableEmptyObservable.Instance; } else { return new EmptyObservable(scheduler); } } /// /// Empty Observable. Returns only OnCompleted. witness is for type inference. /// public static IObservable Empty(T witness) { return Empty(Scheduler.DefaultSchedulers.ConstantTimeOperations); } /// /// Empty Observable. Returns only OnCompleted on specified scheduler. witness is for type inference. /// public static IObservable Empty(IScheduler scheduler, T witness) { return Empty(scheduler); } /// /// Non-Terminating Observable. It's no returns, never finish. /// public static IObservable Never() { return ImmutableNeverObservable.Instance; } /// /// Non-Terminating Observable. It's no returns, never finish. witness is for type inference. /// public static IObservable Never(T witness) { return ImmutableNeverObservable.Instance; } /// /// Return single sequence Immediately. /// public static IObservable Return(T value) { return Return(value, Scheduler.DefaultSchedulers.ConstantTimeOperations); } /// /// Return single sequence on specified scheduler. /// public static IObservable Return(T value, IScheduler scheduler) { if (scheduler == Scheduler.Immediate) { return new ImmediateReturnObservable(value); } else { return new ReturnObservable(value, scheduler); } } /// /// Return single sequence Immediately, optimized for Unit(no allocate memory). /// public static IObservable Return(Unit value) { return ImmutableReturnUnitObservable.Instance; } /// /// Return single sequence Immediately, optimized for Boolean(no allocate memory). /// public static IObservable Return(bool value) { return (value == true) ? (IObservable)ImmutableReturnTrueObservable.Instance : (IObservable)ImmutableReturnFalseObservable.Instance; } /// /// Return single sequence Immediately, optimized for Int32. /// public static IObservable Return(int value) { return ImmutableReturnInt32Observable.GetInt32Observable(value); } /// /// Same as Observable.Return(Unit.Default); but no allocate memory. /// public static IObservable ReturnUnit() { return ImmutableReturnUnitObservable.Instance; } /// /// Empty Observable. Returns only onError. /// public static IObservable Throw(Exception error) { return Throw(error, Scheduler.DefaultSchedulers.ConstantTimeOperations); } /// /// Empty Observable. Returns only onError. witness if for Type inference. /// public static IObservable Throw(Exception error, T witness) { return Throw(error, Scheduler.DefaultSchedulers.ConstantTimeOperations); } /// /// Empty Observable. Returns only onError on specified scheduler. /// public static IObservable Throw(Exception error, IScheduler scheduler) { return new ThrowObservable(error, scheduler); } /// /// Empty Observable. Returns only onError on specified scheduler. witness if for Type inference. /// public static IObservable Throw(Exception error, IScheduler scheduler, T witness) { return Throw(error, scheduler); } public static IObservable Range(int start, int count) { return Range(start, count, Scheduler.DefaultSchedulers.Iteration); } public static IObservable Range(int start, int count, IScheduler scheduler) { return new RangeObservable(start, count, scheduler); } public static IObservable Repeat(T value) { return Repeat(value, Scheduler.DefaultSchedulers.Iteration); } public static IObservable Repeat(T value, IScheduler scheduler) { if (scheduler == null) throw new ArgumentNullException("scheduler"); return new RepeatObservable(value, null, scheduler); } public static IObservable Repeat(T value, int repeatCount) { return Repeat(value, repeatCount, Scheduler.DefaultSchedulers.Iteration); } public static IObservable Repeat(T value, int repeatCount, IScheduler scheduler) { if (repeatCount < 0) throw new ArgumentOutOfRangeException("repeatCount"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new RepeatObservable(value, repeatCount, scheduler); } public static IObservable Repeat(this IObservable source) { return RepeatInfinite(source).Concat(); } static IEnumerable> RepeatInfinite(IObservable source) { while (true) { yield return source; } } /// /// Same as Repeat() but if arriving contiguous "OnComplete" Repeat stops. /// public static IObservable RepeatSafe(this IObservable source) { return new RepeatSafeObservable(RepeatInfinite(source), source.IsRequiredSubscribeOnCurrentThread()); } public static IObservable Defer(Func> observableFactory) { return new DeferObservable(observableFactory); } public static IObservable Start(Func function) { return new StartObservable(function, null, Scheduler.DefaultSchedulers.AsyncConversions); } public static IObservable Start(Func function, TimeSpan timeSpan) { return new StartObservable(function, timeSpan, Scheduler.DefaultSchedulers.AsyncConversions); } public static IObservable Start(Func function, IScheduler scheduler) { return new StartObservable(function, null, scheduler); } public static IObservable Start(Func function, TimeSpan timeSpan, IScheduler scheduler) { return new StartObservable(function, timeSpan, scheduler); } public static IObservable Start(Action action) { return new StartObservable(action, null, Scheduler.DefaultSchedulers.AsyncConversions); } public static IObservable Start(Action action, TimeSpan timeSpan) { return new StartObservable(action, timeSpan, Scheduler.DefaultSchedulers.AsyncConversions); } public static IObservable Start(Action action, IScheduler scheduler) { return new StartObservable(action, null, scheduler); } public static IObservable Start(Action action, TimeSpan timeSpan, IScheduler scheduler) { return new StartObservable(action, timeSpan, scheduler); } public static Func> ToAsync(Func function) { return ToAsync(function, Scheduler.DefaultSchedulers.AsyncConversions); } public static Func> ToAsync(Func function, IScheduler scheduler) { return () => { var subject = new AsyncSubject(); scheduler.Schedule(() => { var result = default(T); try { result = function(); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(result); subject.OnCompleted(); }); return subject.AsObservable(); }; } public static Func> ToAsync(Action action) { return ToAsync(action, Scheduler.DefaultSchedulers.AsyncConversions); } public static Func> ToAsync(Action action, IScheduler scheduler) { return () => { var subject = new AsyncSubject(); scheduler.Schedule(() => { try { action(); } catch (Exception exception) { subject.OnError(exception); return; } subject.OnNext(Unit.Default); subject.OnCompleted(); }); return subject.AsObservable(); }; } } }