1
0

ReactiveProperty.cs 17 KB


  1. #if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6))
  2. #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
  3. #endif
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Threading;
  7. using UniRx.InternalUtil;
  8. #if !UniRxLibrary
  9. using UnityEngine;
  10. #endif
  11. #if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6))
  12. using System.Threading.Tasks;
  13. #endif
  14. namespace UniRx
  15. {
  16. public interface IReadOnlyReactiveProperty<T> : IObservable<T>
  17. {
  18. T Value { get; }
  19. bool HasValue { get; }
  20. }
  21. public interface IReactiveProperty<T> : IReadOnlyReactiveProperty<T>
  22. {
  23. new T Value { get; set; }
  24. }
  25. internal interface IObserverLinkedList<T>
  26. {
  27. void UnsubscribeNode(ObserverNode<T> node);
  28. }
  29. internal sealed class ObserverNode<T> : IObserver<T>, IDisposable
  30. {
  31. readonly IObserver<T> observer;
  32. IObserverLinkedList<T> list;
  33. public ObserverNode<T> Previous { get; internal set; }
  34. public ObserverNode<T> Next { get; internal set; }
  35. public ObserverNode(IObserverLinkedList<T> list, IObserver<T> observer)
  36. {
  37. this.list = list;
  38. this.observer = observer;
  39. }
  40. public void OnNext(T value)
  41. {
  42. observer.OnNext(value);
  43. }
  44. public void OnError(Exception error)
  45. {
  46. observer.OnError(error);
  47. }
  48. public void OnCompleted()
  49. {
  50. observer.OnCompleted();
  51. }
  52. public void Dispose()
  53. {
  54. var sourceList = Interlocked.Exchange(ref list, null);
  55. if (sourceList != null)
  56. {
  57. sourceList.UnsubscribeNode(this);
  58. sourceList = null;
  59. }
  60. }
  61. }
  62. /// <summary>
  63. /// Lightweight property broker.
  64. /// </summary>
  65. [Serializable]
  66. public class ReactiveProperty<T> : IReactiveProperty<T>, IDisposable, IOptimizedObservable<T>, IObserverLinkedList<T>
  67. {
  68. #if !UniRxLibrary
  69. static readonly IEqualityComparer<T> defaultEqualityComparer = UnityEqualityComparer.GetDefault<T>();
  70. #else
  71. static readonly IEqualityComparer<T> defaultEqualityComparer = EqualityComparer<T>.Default;
  72. #endif
  73. #if !UniRxLibrary
  74. [SerializeField]
  75. #endif
  76. T value = default(T);
  77. [NonSerialized]
  78. ObserverNode<T> root;
  79. [NonSerialized]
  80. ObserverNode<T> last;
  81. [NonSerialized]
  82. bool isDisposed = false;
  83. protected virtual IEqualityComparer<T> EqualityComparer
  84. {
  85. get
  86. {
  87. return defaultEqualityComparer;
  88. }
  89. }
  90. public T Value
  91. {
  92. get
  93. {
  94. return value;
  95. }
  96. set
  97. {
  98. if (!EqualityComparer.Equals(this.value, value))
  99. {
  100. SetValue(value);
  101. if (isDisposed)
  102. return;
  103. RaiseOnNext(ref value);
  104. }
  105. }
  106. }
  107. // always true, allows empty constructor 'can' publish value on subscribe.
  108. // because sometimes value is deserialized from UnityEngine.
  109. public bool HasValue
  110. {
  111. get
  112. {
  113. return true;
  114. }
  115. }
  116. public ReactiveProperty()
  117. : this(default(T))
  118. {
  119. }
  120. public ReactiveProperty(T initialValue)
  121. {
  122. SetValue(initialValue);
  123. }
  124. void RaiseOnNext(ref T value)
  125. {
  126. var node = root;
  127. while (node != null)
  128. {
  129. node.OnNext(value);
  130. node = node.Next;
  131. }
  132. }
  133. protected virtual void SetValue(T value)
  134. {
  135. this.value = value;
  136. }
  137. public void SetValueAndForceNotify(T value)
  138. {
  139. SetValue(value);
  140. if (isDisposed)
  141. return;
  142. RaiseOnNext(ref value);
  143. }
  144. public IDisposable Subscribe(IObserver<T> observer)
  145. {
  146. if (isDisposed)
  147. {
  148. observer.OnCompleted();
  149. return Disposable.Empty;
  150. }
  151. // raise latest value on subscribe
  152. observer.OnNext(value);
  153. // subscribe node, node as subscription.
  154. var next = new ObserverNode<T>(this, observer);
  155. if (root == null)
  156. {
  157. root = last = next;
  158. }
  159. else
  160. {
  161. last.Next = next;
  162. next.Previous = last;
  163. last = next;
  164. }
  165. return next;
  166. }
  167. void IObserverLinkedList<T>.UnsubscribeNode(ObserverNode<T> node)
  168. {
  169. if (node == root)
  170. {
  171. root = node.Next;
  172. }
  173. if (node == last)
  174. {
  175. last = node.Previous;
  176. }
  177. if (node.Previous != null)
  178. {
  179. node.Previous.Next = node.Next;
  180. }
  181. if (node.Next != null)
  182. {
  183. node.Next.Previous = node.Previous;
  184. }
  185. }
  186. public void Dispose()
  187. {
  188. Dispose(true);
  189. GC.SuppressFinalize(this);
  190. }
  191. protected virtual void Dispose(bool disposing)
  192. {
  193. if (isDisposed) return;
  194. var node = root;
  195. root = last = null;
  196. isDisposed = true;
  197. while (node != null)
  198. {
  199. node.OnCompleted();
  200. node = node.Next;
  201. }
  202. }
  203. public override string ToString()
  204. {
  205. return (value == null) ? "(null)" : value.ToString();
  206. }
  207. public bool IsRequiredSubscribeOnCurrentThread()
  208. {
  209. return false;
  210. }
  211. }
  212. /// <summary>
  213. /// Lightweight property broker.
  214. /// </summary>
  215. public class ReadOnlyReactiveProperty<T> : IReadOnlyReactiveProperty<T>, IDisposable, IOptimizedObservable<T>, IObserverLinkedList<T>, IObserver<T>
  216. {
  217. #if !UniRxLibrary
  218. static readonly IEqualityComparer<T> defaultEqualityComparer = UnityEqualityComparer.GetDefault<T>();
  219. #else
  220. static readonly IEqualityComparer<T> defaultEqualityComparer = EqualityComparer<T>.Default;
  221. #endif
  222. readonly bool distinctUntilChanged = true;
  223. bool canPublishValueOnSubscribe = false;
  224. bool isDisposed = false;
  225. bool isSourceCompleted = false;
  226. T latestValue = default(T);
  227. Exception lastException = null;
  228. IDisposable sourceConnection = null;
  229. ObserverNode<T> root;
  230. ObserverNode<T> last;
  231. public T Value
  232. {
  233. get
  234. {
  235. return latestValue;
  236. }
  237. }
  238. public bool HasValue
  239. {
  240. get
  241. {
  242. return canPublishValueOnSubscribe;
  243. }
  244. }
  245. protected virtual IEqualityComparer<T> EqualityComparer
  246. {
  247. get
  248. {
  249. return defaultEqualityComparer;
  250. }
  251. }
  252. public ReadOnlyReactiveProperty(IObservable<T> source)
  253. {
  254. this.sourceConnection = source.Subscribe(this);
  255. }
  256. public ReadOnlyReactiveProperty(IObservable<T> source, bool distinctUntilChanged)
  257. {
  258. this.distinctUntilChanged = distinctUntilChanged;
  259. this.sourceConnection = source.Subscribe(this);
  260. }
  261. public ReadOnlyReactiveProperty(IObservable<T> source, T initialValue)
  262. {
  263. this.latestValue = initialValue;
  264. this.canPublishValueOnSubscribe = true;
  265. this.sourceConnection = source.Subscribe(this);
  266. }
  267. public ReadOnlyReactiveProperty(IObservable<T> source, T initialValue, bool distinctUntilChanged)
  268. {
  269. this.distinctUntilChanged = distinctUntilChanged;
  270. this.latestValue = initialValue;
  271. this.canPublishValueOnSubscribe = true;
  272. this.sourceConnection = source.Subscribe(this);
  273. }
  274. public IDisposable Subscribe(IObserver<T> observer)
  275. {
  276. if (lastException != null)
  277. {
  278. observer.OnError(lastException);
  279. return Disposable.Empty;
  280. }
  281. if (isSourceCompleted)
  282. {
  283. if (canPublishValueOnSubscribe)
  284. {
  285. observer.OnNext(latestValue);
  286. observer.OnCompleted();
  287. return Disposable.Empty;
  288. }
  289. else
  290. {
  291. observer.OnCompleted();
  292. return Disposable.Empty;
  293. }
  294. }
  295. if (isDisposed)
  296. {
  297. observer.OnCompleted();
  298. return Disposable.Empty;
  299. }
  300. if (canPublishValueOnSubscribe)
  301. {
  302. observer.OnNext(latestValue);
  303. }
  304. // subscribe node, node as subscription.
  305. var next = new ObserverNode<T>(this, observer);
  306. if (root == null)
  307. {
  308. root = last = next;
  309. }
  310. else
  311. {
  312. last.Next = next;
  313. next.Previous = last;
  314. last = next;
  315. }
  316. return next;
  317. }
  318. public void Dispose()
  319. {
  320. Dispose(true);
  321. GC.SuppressFinalize(this);
  322. }
  323. protected virtual void Dispose(bool disposing)
  324. {
  325. if (isDisposed) return;
  326. sourceConnection.Dispose();
  327. var node = root;
  328. root = last = null;
  329. isDisposed = true;
  330. while (node != null)
  331. {
  332. node.OnCompleted();
  333. node = node.Next;
  334. }
  335. }
  336. void IObserverLinkedList<T>.UnsubscribeNode(ObserverNode<T> node)
  337. {
  338. if (node == root)
  339. {
  340. root = node.Next;
  341. }
  342. if (node == last)
  343. {
  344. last = node.Previous;
  345. }
  346. if (node.Previous != null)
  347. {
  348. node.Previous.Next = node.Next;
  349. }
  350. if (node.Next != null)
  351. {
  352. node.Next.Previous = node.Previous;
  353. }
  354. }
  355. void IObserver<T>.OnNext(T value)
  356. {
  357. if (isDisposed) return;
  358. if (canPublishValueOnSubscribe)
  359. {
  360. if (distinctUntilChanged && EqualityComparer.Equals(this.latestValue, value))
  361. {
  362. return;
  363. }
  364. }
  365. canPublishValueOnSubscribe = true;
  366. // SetValue
  367. this.latestValue = value;
  368. // call source.OnNext
  369. var node = root;
  370. while (node != null)
  371. {
  372. node.OnNext(value);
  373. node = node.Next;
  374. }
  375. }
  376. void IObserver<T>.OnError(Exception error)
  377. {
  378. lastException = error;
  379. // call source.OnError
  380. var node = root;
  381. while (node != null)
  382. {
  383. node.OnError(error);
  384. node = node.Next;
  385. }
  386. root = last = null;
  387. }
  388. void IObserver<T>.OnCompleted()
  389. {
  390. isSourceCompleted = true;
  391. root = last = null;
  392. }
  393. public override string ToString()
  394. {
  395. return (latestValue == null) ? "(null)" : latestValue.ToString();
  396. }
  397. public bool IsRequiredSubscribeOnCurrentThread()
  398. {
  399. return false;
  400. }
  401. }
  402. /// <summary>
  403. /// Extension methods of ReactiveProperty&lt;T&gt;
  404. /// </summary>
  405. public static class ReactivePropertyExtensions
  406. {
  407. public static IReadOnlyReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
  408. {
  409. return new ReadOnlyReactiveProperty<T>(source);
  410. }
  411. public static IReadOnlyReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T initialValue)
  412. {
  413. return new ReadOnlyReactiveProperty<T>(source, initialValue);
  414. }
  415. public static ReadOnlyReactiveProperty<T> ToReadOnlyReactiveProperty<T>(this IObservable<T> source)
  416. {
  417. return new ReadOnlyReactiveProperty<T>(source);
  418. }
  419. #if CSHARP_7_OR_LATER || (UNITY_2018_3_OR_NEWER && (NET_STANDARD_2_0 || NET_4_6))
  420. static readonly Action<object> Callback = CancelCallback;
  421. static void CancelCallback(object state)
  422. {
  423. var tuple = (Tuple<ICancellableTaskCompletionSource, IDisposable>)state;
  424. tuple.Item2.Dispose();
  425. tuple.Item1.TrySetCanceled();
  426. }
  427. public static Task<T> WaitUntilValueChangedAsync<T>(this IReadOnlyReactiveProperty<T> source, CancellationToken cancellationToken = default(CancellationToken))
  428. {
  429. var tcs = new CancellableTaskCompletionSource<T>();
  430. var disposable = new SingleAssignmentDisposable();
  431. if (source.HasValue)
  432. {
  433. // Skip first value
  434. var isFirstValue = true;
  435. disposable.Disposable = source.Subscribe(x =>
  436. {
  437. if (isFirstValue)
  438. {
  439. isFirstValue = false;
  440. return;
  441. }
  442. else
  443. {
  444. disposable.Dispose(); // finish subscription.
  445. tcs.TrySetResult(x);
  446. }
  447. }, ex => tcs.TrySetException(ex), () => tcs.TrySetCanceled());
  448. }
  449. else
  450. {
  451. disposable.Disposable = source.Subscribe(x =>
  452. {
  453. disposable.Dispose(); // finish subscription.
  454. tcs.TrySetResult(x);
  455. }, ex => tcs.TrySetException(ex), () => tcs.TrySetCanceled());
  456. }
  457. cancellationToken.Register(Callback, Tuple.Create(tcs, disposable.Disposable), false);
  458. return tcs.Task;
  459. }
  460. public static System.Runtime.CompilerServices.TaskAwaiter<T> GetAwaiter<T>(this IReadOnlyReactiveProperty<T> source)
  461. {
  462. return source.WaitUntilValueChangedAsync(CancellationToken.None).GetAwaiter();
  463. }
  464. #endif
  465. /// <summary>
  466. /// Create ReadOnlyReactiveProperty with distinctUntilChanged: false.
  467. /// </summary>
  468. public static ReadOnlyReactiveProperty<T> ToSequentialReadOnlyReactiveProperty<T>(this IObservable<T> source)
  469. {
  470. return new ReadOnlyReactiveProperty<T>(source, distinctUntilChanged: false);
  471. }
  472. public static ReadOnlyReactiveProperty<T> ToReadOnlyReactiveProperty<T>(this IObservable<T> source, T initialValue)
  473. {
  474. return new ReadOnlyReactiveProperty<T>(source, initialValue);
  475. }
  476. /// <summary>
  477. /// Create ReadOnlyReactiveProperty with distinctUntilChanged: false.
  478. /// </summary>
  479. public static ReadOnlyReactiveProperty<T> ToSequentialReadOnlyReactiveProperty<T>(this IObservable<T> source, T initialValue)
  480. {
  481. return new ReadOnlyReactiveProperty<T>(source, initialValue, distinctUntilChanged: false);
  482. }
  483. public static IObservable<T> SkipLatestValueOnSubscribe<T>(this IReadOnlyReactiveProperty<T> source)
  484. {
  485. return source.HasValue ? source.Skip(1) : source;
  486. }
  487. // for multiple toggle or etc..
  488. /// <summary>
  489. /// Lastest values of each sequence are all true.
  490. /// </summary>
  491. public static IObservable<bool> CombineLatestValuesAreAllTrue(this IEnumerable<IObservable<bool>> sources)
  492. {
  493. return sources.CombineLatest().Select(xs =>
  494. {
  495. foreach (var item in xs)
  496. {
  497. if (item == false)
  498. return false;
  499. }
  500. return true;
  501. });
  502. }
  503. /// <summary>
  504. /// Lastest values of each sequence are all false.
  505. /// </summary>
  506. public static IObservable<bool> CombineLatestValuesAreAllFalse(this IEnumerable<IObservable<bool>> sources)
  507. {
  508. return sources.CombineLatest().Select(xs =>
  509. {
  510. foreach (var item in xs)
  511. {
  512. if (item == true)
  513. return false;
  514. }
  515. return true;
  516. });
  517. }
  518. }
  519. }