#if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6)) #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member #endif using System; using System.Collections.Generic; using System.Threading; using UniRx.InternalUtil; #if !UniRxLibrary using UnityEngine; #endif #if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6)) using System.Threading.Tasks; #endif namespace UniRx { public interface IReadOnlyReactiveProperty : IObservable { T Value { get; } bool HasValue { get; } } public interface IReactiveProperty : IReadOnlyReactiveProperty { new T Value { get; set; } } internal interface IObserverLinkedList { void UnsubscribeNode(ObserverNode node); } internal sealed class ObserverNode : IObserver, IDisposable { readonly IObserver observer; IObserverLinkedList list; public ObserverNode Previous { get; internal set; } public ObserverNode Next { get; internal set; } public ObserverNode(IObserverLinkedList list, IObserver observer) { this.list = list; this.observer = observer; } public void OnNext(T value) { observer.OnNext(value); } public void OnError(Exception error) { observer.OnError(error); } public void OnCompleted() { observer.OnCompleted(); } public void Dispose() { var sourceList = Interlocked.Exchange(ref list, null); if (sourceList != null) { sourceList.UnsubscribeNode(this); sourceList = null; } } } /// /// Lightweight property broker. /// [Serializable] public class ReactiveProperty : IReactiveProperty, IDisposable, IOptimizedObservable, IObserverLinkedList { #if !UniRxLibrary static readonly IEqualityComparer defaultEqualityComparer = UnityEqualityComparer.GetDefault(); #else static readonly IEqualityComparer defaultEqualityComparer = EqualityComparer.Default; #endif #if !UniRxLibrary [SerializeField] #endif T value = default(T); [NonSerialized] ObserverNode root; [NonSerialized] ObserverNode last; [NonSerialized] bool isDisposed = false; protected virtual IEqualityComparer EqualityComparer { get { return defaultEqualityComparer; } } public T Value { get { return value; } set { if (!EqualityComparer.Equals(this.value, value)) { SetValue(value); if (isDisposed) return; RaiseOnNext(ref value); } } } // always true, allows empty constructor 'can' publish value on subscribe. // because sometimes value is deserialized from UnityEngine. public bool HasValue { get { return true; } } public ReactiveProperty() : this(default(T)) { } public ReactiveProperty(T initialValue) { SetValue(initialValue); } void RaiseOnNext(ref T value) { var node = root; while (node != null) { node.OnNext(value); node = node.Next; } } protected virtual void SetValue(T value) { this.value = value; } public void SetValueAndForceNotify(T value) { SetValue(value); if (isDisposed) return; RaiseOnNext(ref value); } public IDisposable Subscribe(IObserver observer) { if (isDisposed) { observer.OnCompleted(); return Disposable.Empty; } // raise latest value on subscribe observer.OnNext(value); // subscribe node, node as subscription. var next = new ObserverNode(this, observer); if (root == null) { root = last = next; } else { last.Next = next; next.Previous = last; last = next; } return next; } void IObserverLinkedList.UnsubscribeNode(ObserverNode node) { if (node == root) { root = node.Next; } if (node == last) { last = node.Previous; } if (node.Previous != null) { node.Previous.Next = node.Next; } if (node.Next != null) { node.Next.Previous = node.Previous; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (isDisposed) return; var node = root; root = last = null; isDisposed = true; while (node != null) { node.OnCompleted(); node = node.Next; } } public override string ToString() { return (value == null) ? "(null)" : value.ToString(); } public bool IsRequiredSubscribeOnCurrentThread() { return false; } } /// /// Lightweight property broker. /// public class ReadOnlyReactiveProperty : IReadOnlyReactiveProperty, IDisposable, IOptimizedObservable, IObserverLinkedList, IObserver { #if !UniRxLibrary static readonly IEqualityComparer defaultEqualityComparer = UnityEqualityComparer.GetDefault(); #else static readonly IEqualityComparer defaultEqualityComparer = EqualityComparer.Default; #endif readonly bool distinctUntilChanged = true; bool canPublishValueOnSubscribe = false; bool isDisposed = false; bool isSourceCompleted = false; T latestValue = default(T); Exception lastException = null; IDisposable sourceConnection = null; ObserverNode root; ObserverNode last; public T Value { get { return latestValue; } } public bool HasValue { get { return canPublishValueOnSubscribe; } } protected virtual IEqualityComparer EqualityComparer { get { return defaultEqualityComparer; } } public ReadOnlyReactiveProperty(IObservable source) { this.sourceConnection = source.Subscribe(this); } public ReadOnlyReactiveProperty(IObservable source, bool distinctUntilChanged) { this.distinctUntilChanged = distinctUntilChanged; this.sourceConnection = source.Subscribe(this); } public ReadOnlyReactiveProperty(IObservable source, T initialValue) { this.latestValue = initialValue; this.canPublishValueOnSubscribe = true; this.sourceConnection = source.Subscribe(this); } public ReadOnlyReactiveProperty(IObservable source, T initialValue, bool distinctUntilChanged) { this.distinctUntilChanged = distinctUntilChanged; this.latestValue = initialValue; this.canPublishValueOnSubscribe = true; this.sourceConnection = source.Subscribe(this); } public IDisposable Subscribe(IObserver observer) { if (lastException != null) { observer.OnError(lastException); return Disposable.Empty; } if (isSourceCompleted) { if (canPublishValueOnSubscribe) { observer.OnNext(latestValue); observer.OnCompleted(); return Disposable.Empty; } else { observer.OnCompleted(); return Disposable.Empty; } } if (isDisposed) { observer.OnCompleted(); return Disposable.Empty; } if (canPublishValueOnSubscribe) { observer.OnNext(latestValue); } // subscribe node, node as subscription. var next = new ObserverNode(this, observer); if (root == null) { root = last = next; } else { last.Next = next; next.Previous = last; last = next; } return next; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (isDisposed) return; sourceConnection.Dispose(); var node = root; root = last = null; isDisposed = true; while (node != null) { node.OnCompleted(); node = node.Next; } } void IObserverLinkedList.UnsubscribeNode(ObserverNode node) { if (node == root) { root = node.Next; } if (node == last) { last = node.Previous; } if (node.Previous != null) { node.Previous.Next = node.Next; } if (node.Next != null) { node.Next.Previous = node.Previous; } } void IObserver.OnNext(T value) { if (isDisposed) return; if (canPublishValueOnSubscribe) { if (distinctUntilChanged && EqualityComparer.Equals(this.latestValue, value)) { return; } } canPublishValueOnSubscribe = true; // SetValue this.latestValue = value; // call source.OnNext var node = root; while (node != null) { node.OnNext(value); node = node.Next; } } void IObserver.OnError(Exception error) { lastException = error; // call source.OnError var node = root; while (node != null) { node.OnError(error); node = node.Next; } root = last = null; } void IObserver.OnCompleted() { isSourceCompleted = true; root = last = null; } public override string ToString() { return (latestValue == null) ? "(null)" : latestValue.ToString(); } public bool IsRequiredSubscribeOnCurrentThread() { return false; } } /// /// Extension methods of ReactiveProperty<T> /// public static class ReactivePropertyExtensions { public static IReadOnlyReactiveProperty ToReactiveProperty(this IObservable source) { return new ReadOnlyReactiveProperty(source); } public static IReadOnlyReactiveProperty ToReactiveProperty(this IObservable source, T initialValue) { return new ReadOnlyReactiveProperty(source, initialValue); } public static ReadOnlyReactiveProperty ToReadOnlyReactiveProperty(this IObservable source) { return new ReadOnlyReactiveProperty(source); } #if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6)) static readonly Action Callback = CancelCallback; static void CancelCallback(object state) { var tuple = (Tuple)state; tuple.Item2.Dispose(); tuple.Item1.TrySetCanceled(); } public static Task WaitUntilValueChangedAsync(this IReadOnlyReactiveProperty source, CancellationToken cancellationToken = default(CancellationToken)) { var tcs = new CancellableTaskCompletionSource(); var disposable = new SingleAssignmentDisposable(); if (source.HasValue) { // Skip first value var isFirstValue = true; disposable.Disposable = source.Subscribe(x => { if (isFirstValue) { isFirstValue = false; return; } else { disposable.Dispose(); // finish subscription. tcs.TrySetResult(x); } }, ex => tcs.TrySetException(ex), () => tcs.TrySetCanceled()); } else { disposable.Disposable = source.Subscribe(x => { disposable.Dispose(); // finish subscription. tcs.TrySetResult(x); }, ex => tcs.TrySetException(ex), () => tcs.TrySetCanceled()); } cancellationToken.Register(Callback, Tuple.Create(tcs, disposable.Disposable), false); return tcs.Task; } public static System.Runtime.CompilerServices.TaskAwaiter GetAwaiter(this IReadOnlyReactiveProperty source) { return source.WaitUntilValueChangedAsync(CancellationToken.None).GetAwaiter(); } #endif /// /// Create ReadOnlyReactiveProperty with distinctUntilChanged: false. /// public static ReadOnlyReactiveProperty ToSequentialReadOnlyReactiveProperty(this IObservable source) { return new ReadOnlyReactiveProperty(source, distinctUntilChanged: false); } public static ReadOnlyReactiveProperty ToReadOnlyReactiveProperty(this IObservable source, T initialValue) { return new ReadOnlyReactiveProperty(source, initialValue); } /// /// Create ReadOnlyReactiveProperty with distinctUntilChanged: false. /// public static ReadOnlyReactiveProperty ToSequentialReadOnlyReactiveProperty(this IObservable source, T initialValue) { return new ReadOnlyReactiveProperty(source, initialValue, distinctUntilChanged: false); } public static IObservable SkipLatestValueOnSubscribe(this IReadOnlyReactiveProperty source) { return source.HasValue ? source.Skip(1) : source; } // for multiple toggle or etc.. /// /// Lastest values of each sequence are all true. /// public static IObservable CombineLatestValuesAreAllTrue(this IEnumerable> sources) { return sources.CombineLatest().Select(xs => { foreach (var item in xs) { if (item == false) return false; } return true; }); } /// /// Lastest values of each sequence are all false. /// public static IObservable CombineLatestValuesAreAllFalse(this IEnumerable> sources) { return sources.CombineLatest().Select(xs => { foreach (var item in xs) { if (item == true) return false; } return true; }); } } }