using System;
using System.Collections.Generic;
using UniRx.Operators;
namespace UniRx
{
public static partial class Observable
{
///
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator.
///
public static IObservable Create(Func, IDisposable> subscribe)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateObservable(subscribe);
}
///
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable).
///
public static IObservable Create(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateObservable(subscribe, isRequiredSubscribeOnCurrentThread);
}
///
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator.
///
public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateObservable(state, subscribe);
}
///
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable).
///
public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateObservable(state, subscribe, isRequiredSubscribeOnCurrentThread);
}
///
/// Create anonymous observable. Safe means auto detach when error raised in onNext pipeline. This is recommended for make generator (ColdObservable).
///
public static IObservable CreateSafe(Func, IDisposable> subscribe)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateSafeObservable(subscribe);
}
///
/// Create anonymous observable. Safe means auto detach when error raised in onNext pipeline. This is recommended for make generator (ColdObservable).
///
public static IObservable CreateSafe(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
{
if (subscribe == null) throw new ArgumentNullException("subscribe");
return new CreateSafeObservable(subscribe, isRequiredSubscribeOnCurrentThread);
}
///
/// Empty Observable. Returns only OnCompleted.
///
public static IObservable Empty()
{
return Empty(Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
///
/// Empty Observable. Returns only OnCompleted on specified scheduler.
///
public static IObservable Empty(IScheduler scheduler)
{
if (scheduler == Scheduler.Immediate)
{
return ImmutableEmptyObservable.Instance;
}
else
{
return new EmptyObservable(scheduler);
}
}
///
/// Empty Observable. Returns only OnCompleted. witness is for type inference.
///
public static IObservable Empty(T witness)
{
return Empty(Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
///
/// Empty Observable. Returns only OnCompleted on specified scheduler. witness is for type inference.
///
public static IObservable Empty(IScheduler scheduler, T witness)
{
return Empty(scheduler);
}
///
/// Non-Terminating Observable. It's no returns, never finish.
///
public static IObservable Never()
{
return ImmutableNeverObservable.Instance;
}
///
/// Non-Terminating Observable. It's no returns, never finish. witness is for type inference.
///
public static IObservable Never(T witness)
{
return ImmutableNeverObservable.Instance;
}
///
/// Return single sequence Immediately.
///
public static IObservable Return(T value)
{
return Return(value, Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
///
/// Return single sequence on specified scheduler.
///
public static IObservable Return(T value, IScheduler scheduler)
{
if (scheduler == Scheduler.Immediate)
{
return new ImmediateReturnObservable(value);
}
else
{
return new ReturnObservable(value, scheduler);
}
}
///
/// Return single sequence Immediately, optimized for Unit(no allocate memory).
///
public static IObservable Return(Unit value)
{
return ImmutableReturnUnitObservable.Instance;
}
///
/// Return single sequence Immediately, optimized for Boolean(no allocate memory).
///
public static IObservable Return(bool value)
{
return (value == true)
? (IObservable)ImmutableReturnTrueObservable.Instance
: (IObservable)ImmutableReturnFalseObservable.Instance;
}
///
/// Return single sequence Immediately, optimized for Int32.
///
public static IObservable Return(int value)
{
return ImmutableReturnInt32Observable.GetInt32Observable(value);
}
///
/// Same as Observable.Return(Unit.Default); but no allocate memory.
///
public static IObservable ReturnUnit()
{
return ImmutableReturnUnitObservable.Instance;
}
///
/// Empty Observable. Returns only onError.
///
public static IObservable Throw(Exception error)
{
return Throw(error, Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
///
/// Empty Observable. Returns only onError. witness if for Type inference.
///
public static IObservable Throw(Exception error, T witness)
{
return Throw(error, Scheduler.DefaultSchedulers.ConstantTimeOperations);
}
///
/// Empty Observable. Returns only onError on specified scheduler.
///
public static IObservable Throw(Exception error, IScheduler scheduler)
{
return new ThrowObservable(error, scheduler);
}
///
/// Empty Observable. Returns only onError on specified scheduler. witness if for Type inference.
///
public static IObservable Throw(Exception error, IScheduler scheduler, T witness)
{
return Throw(error, scheduler);
}
public static IObservable Range(int start, int count)
{
return Range(start, count, Scheduler.DefaultSchedulers.Iteration);
}
public static IObservable Range(int start, int count, IScheduler scheduler)
{
return new RangeObservable(start, count, scheduler);
}
public static IObservable Repeat(T value)
{
return Repeat(value, Scheduler.DefaultSchedulers.Iteration);
}
public static IObservable Repeat(T value, IScheduler scheduler)
{
if (scheduler == null) throw new ArgumentNullException("scheduler");
return new RepeatObservable(value, null, scheduler);
}
public static IObservable Repeat(T value, int repeatCount)
{
return Repeat(value, repeatCount, Scheduler.DefaultSchedulers.Iteration);
}
public static IObservable Repeat(T value, int repeatCount, IScheduler scheduler)
{
if (repeatCount < 0) throw new ArgumentOutOfRangeException("repeatCount");
if (scheduler == null) throw new ArgumentNullException("scheduler");
return new RepeatObservable(value, repeatCount, scheduler);
}
public static IObservable Repeat(this IObservable source)
{
return RepeatInfinite(source).Concat();
}
static IEnumerable> RepeatInfinite(IObservable source)
{
while (true)
{
yield return source;
}
}
///
/// Same as Repeat() but if arriving contiguous "OnComplete" Repeat stops.
///
public static IObservable RepeatSafe(this IObservable source)
{
return new RepeatSafeObservable(RepeatInfinite(source), source.IsRequiredSubscribeOnCurrentThread());
}
public static IObservable Defer(Func> observableFactory)
{
return new DeferObservable(observableFactory);
}
public static IObservable Start(Func function)
{
return new StartObservable(function, null, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static IObservable Start(Func function, TimeSpan timeSpan)
{
return new StartObservable(function, timeSpan, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static IObservable Start(Func function, IScheduler scheduler)
{
return new StartObservable(function, null, scheduler);
}
public static IObservable Start(Func function, TimeSpan timeSpan, IScheduler scheduler)
{
return new StartObservable(function, timeSpan, scheduler);
}
public static IObservable Start(Action action)
{
return new StartObservable(action, null, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static IObservable Start(Action action, TimeSpan timeSpan)
{
return new StartObservable(action, timeSpan, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static IObservable Start(Action action, IScheduler scheduler)
{
return new StartObservable(action, null, scheduler);
}
public static IObservable Start(Action action, TimeSpan timeSpan, IScheduler scheduler)
{
return new StartObservable(action, timeSpan, scheduler);
}
public static Func> ToAsync(Func function)
{
return ToAsync(function, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static Func> ToAsync(Func function, IScheduler scheduler)
{
return () =>
{
var subject = new AsyncSubject();
scheduler.Schedule(() =>
{
var result = default(T);
try
{
result = function();
}
catch (Exception exception)
{
subject.OnError(exception);
return;
}
subject.OnNext(result);
subject.OnCompleted();
});
return subject.AsObservable();
};
}
public static Func> ToAsync(Action action)
{
return ToAsync(action, Scheduler.DefaultSchedulers.AsyncConversions);
}
public static Func> ToAsync(Action action, IScheduler scheduler)
{
return () =>
{
var subject = new AsyncSubject();
scheduler.Schedule(() =>
{
try
{
action();
}
catch (Exception exception)
{
subject.OnError(exception);
return;
}
subject.OnNext(Unit.Default);
subject.OnCompleted();
});
return subject.AsObservable();
};
}
}
}