AsyncSubject.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. using System;
  2. using System.Collections.Generic;
  3. using UniRx.InternalUtil;
  4. #if (NET_4_6 || NET_STANDARD_2_0)
  5. using System.Runtime.CompilerServices;
  6. using System.Threading;
  7. #endif
  8. namespace UniRx
  9. {
  10. public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
  11. #if (NET_4_6 || NET_STANDARD_2_0)
  12. , INotifyCompletion
  13. #endif
  14. {
  15. object observerLock = new object();
  16. T lastValue;
  17. bool hasValue;
  18. bool isStopped;
  19. bool isDisposed;
  20. Exception lastError;
  21. IObserver<T> outObserver = EmptyObserver<T>.Instance;
  22. public T Value
  23. {
  24. get
  25. {
  26. ThrowIfDisposed();
  27. if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
  28. if (lastError != null) lastError.Throw();
  29. return lastValue;
  30. }
  31. }
  32. public bool HasObservers
  33. {
  34. get
  35. {
  36. return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed;
  37. }
  38. }
  39. public bool IsCompleted { get { return isStopped; } }
  40. public void OnCompleted()
  41. {
  42. IObserver<T> old;
  43. T v;
  44. bool hv;
  45. lock (observerLock)
  46. {
  47. ThrowIfDisposed();
  48. if (isStopped) return;
  49. old = outObserver;
  50. outObserver = EmptyObserver<T>.Instance;
  51. isStopped = true;
  52. v = lastValue;
  53. hv = hasValue;
  54. }
  55. if (hv)
  56. {
  57. old.OnNext(v);
  58. old.OnCompleted();
  59. }
  60. else
  61. {
  62. old.OnCompleted();
  63. }
  64. }
  65. public void OnError(Exception error)
  66. {
  67. if (error == null) throw new ArgumentNullException("error");
  68. IObserver<T> old;
  69. lock (observerLock)
  70. {
  71. ThrowIfDisposed();
  72. if (isStopped) return;
  73. old = outObserver;
  74. outObserver = EmptyObserver<T>.Instance;
  75. isStopped = true;
  76. lastError = error;
  77. }
  78. old.OnError(error);
  79. }
  80. public void OnNext(T value)
  81. {
  82. lock (observerLock)
  83. {
  84. ThrowIfDisposed();
  85. if (isStopped) return;
  86. this.hasValue = true;
  87. this.lastValue = value;
  88. }
  89. }
  90. public IDisposable Subscribe(IObserver<T> observer)
  91. {
  92. if (observer == null) throw new ArgumentNullException("observer");
  93. var ex = default(Exception);
  94. var v = default(T);
  95. var hv = false;
  96. lock (observerLock)
  97. {
  98. ThrowIfDisposed();
  99. if (!isStopped)
  100. {
  101. var listObserver = outObserver as ListObserver<T>;
  102. if (listObserver != null)
  103. {
  104. outObserver = listObserver.Add(observer);
  105. }
  106. else
  107. {
  108. var current = outObserver;
  109. if (current is EmptyObserver<T>)
  110. {
  111. outObserver = observer;
  112. }
  113. else
  114. {
  115. outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
  116. }
  117. }
  118. return new Subscription(this, observer);
  119. }
  120. ex = lastError;
  121. v = lastValue;
  122. hv = hasValue;
  123. }
  124. if (ex != null)
  125. {
  126. observer.OnError(ex);
  127. }
  128. else if (hv)
  129. {
  130. observer.OnNext(v);
  131. observer.OnCompleted();
  132. }
  133. else
  134. {
  135. observer.OnCompleted();
  136. }
  137. return Disposable.Empty;
  138. }
  139. public void Dispose()
  140. {
  141. lock (observerLock)
  142. {
  143. isDisposed = true;
  144. outObserver = DisposedObserver<T>.Instance;
  145. lastError = null;
  146. lastValue = default(T);
  147. }
  148. }
  149. void ThrowIfDisposed()
  150. {
  151. if (isDisposed) throw new ObjectDisposedException("");
  152. }
  153. public bool IsRequiredSubscribeOnCurrentThread()
  154. {
  155. return false;
  156. }
  157. class Subscription : IDisposable
  158. {
  159. readonly object gate = new object();
  160. AsyncSubject<T> parent;
  161. IObserver<T> unsubscribeTarget;
  162. public Subscription(AsyncSubject<T> parent, IObserver<T> unsubscribeTarget)
  163. {
  164. this.parent = parent;
  165. this.unsubscribeTarget = unsubscribeTarget;
  166. }
  167. public void Dispose()
  168. {
  169. lock (gate)
  170. {
  171. if (parent != null)
  172. {
  173. lock (parent.observerLock)
  174. {
  175. var listObserver = parent.outObserver as ListObserver<T>;
  176. if (listObserver != null)
  177. {
  178. parent.outObserver = listObserver.Remove(unsubscribeTarget);
  179. }
  180. else
  181. {
  182. parent.outObserver = EmptyObserver<T>.Instance;
  183. }
  184. unsubscribeTarget = null;
  185. parent = null;
  186. }
  187. }
  188. }
  189. }
  190. }
  191. #if (NET_4_6 || NET_STANDARD_2_0)
  192. /// <summary>
  193. /// Gets an awaitable object for the current AsyncSubject.
  194. /// </summary>
  195. /// <returns>Object that can be awaited.</returns>
  196. public AsyncSubject<T> GetAwaiter()
  197. {
  198. return this;
  199. }
  200. /// <summary>
  201. /// Specifies a callback action that will be invoked when the subject completes.
  202. /// </summary>
  203. /// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
  204. /// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
  205. public void OnCompleted(Action continuation)
  206. {
  207. if (continuation == null)
  208. throw new ArgumentNullException("continuation");
  209. OnCompleted(continuation, true);
  210. }
  211. void OnCompleted(Action continuation, bool originalContext)
  212. {
  213. //
  214. // [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.
  215. //
  216. this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
  217. }
  218. class AwaitObserver : IObserver<T>
  219. {
  220. private readonly SynchronizationContext _context;
  221. private readonly Action _callback;
  222. public AwaitObserver(Action callback, bool originalContext)
  223. {
  224. if (originalContext)
  225. _context = SynchronizationContext.Current;
  226. _callback = callback;
  227. }
  228. public void OnCompleted()
  229. {
  230. InvokeOnOriginalContext();
  231. }
  232. public void OnError(Exception error)
  233. {
  234. InvokeOnOriginalContext();
  235. }
  236. public void OnNext(T value)
  237. {
  238. }
  239. private void InvokeOnOriginalContext()
  240. {
  241. if (_context != null)
  242. {
  243. //
  244. // No need for OperationStarted and OperationCompleted calls here;
  245. // this code is invoked through await support and will have a way
  246. // to observe its start/complete behavior, either through returned
  247. // Task objects or the async method builder's interaction with the
  248. // SynchronizationContext object.
  249. //
  250. _context.Post(c => ((Action)c)(), _callback);
  251. }
  252. else
  253. {
  254. _callback();
  255. }
  256. }
  257. }
  258. /// <summary>
  259. /// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
  260. /// </summary>
  261. /// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>
  262. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  263. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
  264. public T GetResult()
  265. {
  266. if (!isStopped)
  267. {
  268. var e = new ManualResetEvent(false);
  269. OnCompleted(() => e.Set(), false);
  270. e.WaitOne();
  271. }
  272. if (lastError != null)
  273. {
  274. lastError.Throw();
  275. }
  276. if (!hasValue)
  277. throw new InvalidOperationException("NO_ELEMENTS");
  278. return lastValue;
  279. }
  280. #endif
  281. }
  282. }