// original code from rx.codeplex.com
// some modified.
/* ------------------ */
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Diagnostics;
using System.Globalization;
using System.Collections.Generic;
using System;
using UniRx.InternalUtil;
#pragma warning disable 0659
#pragma warning disable 0661
namespace UniRx
{
///
/// Provides a mechanism for receiving push-based notifications and returning a response.
///
///
/// The type of the elements received by the observer.
/// This type parameter is contravariant. That is, you can use either the type you specified or any type that is less derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
///
///
/// The type of the result returned from the observer's notification handlers.
/// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
///
public interface IObserver
{
///
/// Notifies the observer of a new element in the sequence.
///
/// The new element in the sequence.
/// Result returned upon observation of a new element.
TResult OnNext(TValue value);
///
/// Notifies the observer that an exception has occurred.
///
/// The exception that occurred.
/// Result returned upon observation of an error.
TResult OnError(Exception exception);
///
/// Notifies the observer of the end of the sequence.
///
/// Result returned upon observation of the sequence completion.
TResult OnCompleted();
}
///
/// Indicates the type of a notification.
///
public enum NotificationKind
{
///
/// Represents an OnNext notification.
///
OnNext,
///
/// Represents an OnError notification.
///
OnError,
///
/// Represents an OnCompleted notification.
///
OnCompleted
}
///
/// Represents a notification to an observer.
///
/// The type of the elements received by the observer.
[Serializable]
public abstract class Notification : IEquatable>
{
///
/// Default constructor used by derived types.
///
protected internal Notification()
{
}
///
/// Returns the value of an OnNext notification or throws an exception.
///
public abstract T Value
{
get;
}
///
/// Returns a value that indicates whether the notification has a value.
///
public abstract bool HasValue
{
get;
}
///
/// Returns the exception of an OnError notification or returns null.
///
public abstract Exception Exception
{
get;
}
///
/// Gets the kind of notification that is represented.
///
public abstract NotificationKind Kind
{
get;
}
///
/// Represents an OnNext notification to an observer.
///
[DebuggerDisplay("OnNext({Value})")]
[Serializable]
internal sealed class OnNextNotification : Notification
{
T value;
///
/// Constructs a notification of a new value.
///
public OnNextNotification(T value)
{
this.value = value;
}
///
/// Returns the value of an OnNext notification.
///
public override T Value { get { return value; } }
///
/// Returns null.
///
public override Exception Exception { get { return null; } }
///
/// Returns true.
///
public override bool HasValue { get { return true; } }
///
/// Returns NotificationKind.OnNext.
///
public override NotificationKind Kind { get { return NotificationKind.OnNext; } }
///
/// Returns the hash code for this instance.
///
public override int GetHashCode()
{
return EqualityComparer.Default.GetHashCode(Value);
}
///
/// Indicates whether this instance and a specified object are equal.
///
public override bool Equals(Notification other)
{
if (Object.ReferenceEquals(this, other))
return true;
if (Object.ReferenceEquals(other, null))
return false;
if (other.Kind != NotificationKind.OnNext)
return false;
return EqualityComparer.Default.Equals(Value, other.Value);
}
///
/// Returns a string representation of this instance.
///
public override string ToString()
{
return String.Format(CultureInfo.CurrentCulture, "OnNext({0})", Value);
}
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
observer.OnNext(Value);
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return observer.OnNext(Value);
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
onNext(Value);
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return onNext(Value);
}
}
///
/// Represents an OnError notification to an observer.
///
#if !NO_DEBUGGER_ATTRIBUTES
[DebuggerDisplay("OnError({Exception})")]
#endif
#if !NO_SERIALIZABLE
[Serializable]
#endif
internal sealed class OnErrorNotification : Notification
{
Exception exception;
///
/// Constructs a notification of an exception.
///
public OnErrorNotification(Exception exception)
{
this.exception = exception;
}
///
/// Throws the exception.
///
public override T Value { get { exception.Throw(); throw exception; } }
///
/// Returns the exception.
///
public override Exception Exception { get { return exception; } }
///
/// Returns false.
///
public override bool HasValue { get { return false; } }
///
/// Returns NotificationKind.OnError.
///
public override NotificationKind Kind { get { return NotificationKind.OnError; } }
///
/// Returns the hash code for this instance.
///
public override int GetHashCode()
{
return Exception.GetHashCode();
}
///
/// Indicates whether this instance and other are equal.
///
public override bool Equals(Notification other)
{
if (Object.ReferenceEquals(this, other))
return true;
if (Object.ReferenceEquals(other, null))
return false;
if (other.Kind != NotificationKind.OnError)
return false;
return Object.Equals(Exception, other.Exception);
}
///
/// Returns a string representation of this instance.
///
public override string ToString()
{
return String.Format(CultureInfo.CurrentCulture, "OnError({0})", Exception.GetType().FullName);
}
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
observer.OnError(Exception);
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return observer.OnError(Exception);
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
onError(Exception);
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return onError(Exception);
}
}
///
/// Represents an OnCompleted notification to an observer.
///
[DebuggerDisplay("OnCompleted()")]
[Serializable]
internal sealed class OnCompletedNotification : Notification
{
///
/// Constructs a notification of the end of a sequence.
///
public OnCompletedNotification()
{
}
///
/// Throws an InvalidOperationException.
///
public override T Value { get { throw new InvalidOperationException("No Value"); } }
///
/// Returns null.
///
public override Exception Exception { get { return null; } }
///
/// Returns false.
///
public override bool HasValue { get { return false; } }
///
/// Returns NotificationKind.OnCompleted.
///
public override NotificationKind Kind { get { return NotificationKind.OnCompleted; } }
///
/// Returns the hash code for this instance.
///
public override int GetHashCode()
{
return typeof(T).GetHashCode() ^ 8510;
}
///
/// Indicates whether this instance and other are equal.
///
public override bool Equals(Notification other)
{
if (Object.ReferenceEquals(this, other))
return true;
if (Object.ReferenceEquals(other, null))
return false;
return other.Kind == NotificationKind.OnCompleted;
}
///
/// Returns a string representation of this instance.
///
public override string ToString()
{
return "OnCompleted()";
}
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
observer.OnCompleted();
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return observer.OnCompleted();
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
onCompleted();
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return onCompleted();
}
}
///
/// Determines whether the current Notification<T> object has the same observer message payload as a specified Notification<T> value.
///
/// An object to compare to the current Notification<T> object.
/// true if both Notification<T> objects have the same observer message payload; otherwise, false.
///
/// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public abstract bool Equals(Notification other);
///
/// Determines whether the two specified Notification<T> objects have the same observer message payload.
///
/// The first Notification<T> to compare, or null.
/// The second Notification<T> to compare, or null.
/// true if the first Notification<T> value has the same observer message payload as the second Notification<T> value; otherwise, false.
///
/// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public static bool operator ==(Notification left, Notification right)
{
if (object.ReferenceEquals(left, right))
return true;
if ((object)left == null || (object)right == null)
return false;
return left.Equals(right);
}
///
/// Determines whether the two specified Notification<T> objects have a different observer message payload.
///
/// The first Notification<T> to compare, or null.
/// The second Notification<T> to compare, or null.
/// true if the first Notification<T> value has a different observer message payload as the second Notification<T> value; otherwise, false.
///
/// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two Notification<T> objects represent a different observer method call, use Object.ReferenceEquals identity equality instead.
///
public static bool operator !=(Notification left, Notification right)
{
return !(left == right);
}
///
/// Determines whether the specified System.Object is equal to the current Notification<T>.
///
/// The System.Object to compare with the current Notification<T>.
/// true if the specified System.Object is equal to the current Notification<T>; otherwise, false.
///
/// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public override bool Equals(object obj)
{
return Equals(obj as Notification);
}
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public abstract void Accept(IObserver observer);
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// The type of the result returned from the observer's notification handlers.
/// Observer to invoke the notification on.
/// Result produced by the observation.
public abstract TResult Accept(IObserver observer);
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public abstract void Accept(Action onNext, Action onError, Action onCompleted);
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// The type of the result returned from the notification handler delegates.
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public abstract TResult Accept(Func onNext, Func onError, Func onCompleted);
///
/// Returns an observable sequence with a single notification, using the immediate scheduler.
///
/// The observable sequence that surfaces the behavior of the notification upon subscription.
public IObservable ToObservable()
{
return this.ToObservable(Scheduler.Immediate);
}
///
/// Returns an observable sequence with a single notification.
///
/// Scheduler to send out the notification calls on.
/// The observable sequence that surfaces the behavior of the notification upon subscription.
public IObservable ToObservable(IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return Observable.Create(observer => scheduler.Schedule(() =>
{
this.Accept(observer);
if (this.Kind == NotificationKind.OnNext)
observer.OnCompleted();
}));
}
}
///
/// Provides a set of static methods for constructing notifications.
///
public static class Notification
{
///
/// Creates an object that represents an OnNext notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The value contained in the notification.
/// The OnNext notification containing the value.
public static Notification CreateOnNext(T value)
{
return new Notification.OnNextNotification(value);
}
///
/// Creates an object that represents an OnError notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The exception contained in the notification.
/// The OnError notification containing the exception.
/// is null.
public static Notification CreateOnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
return new Notification.OnErrorNotification(error);
}
///
/// Creates an object that represents an OnCompleted notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The OnCompleted notification.
public static Notification CreateOnCompleted()
{
return new Notification.OnCompletedNotification();
}
}
}
#pragma warning restore 0659
#pragma warning restore 0661