Observable.Awaiter.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #if (NET_4_6 || NET_STANDARD_2_0)
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace UniRx
  9. {
  10. public static partial class Observable
  11. {
  12. /// <summary>
  13. /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty.
  14. /// This operation subscribes to the observable sequence, making it hot.
  15. /// </summary>
  16. /// <param name="source">Source sequence to await.</param>
  17. public static AsyncSubject<TSource> GetAwaiter<TSource>(this IObservable<TSource> source)
  18. {
  19. if (source == null) throw new ArgumentNullException("source");
  20. return RunAsync(source, CancellationToken.None);
  21. }
  22. /// <summary>
  23. /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty.
  24. /// This operation subscribes to the observable sequence, making it hot.
  25. /// </summary>
  26. /// <param name="source">Source sequence to await.</param>
  27. public static AsyncSubject<TSource> GetAwaiter<TSource>(this IConnectableObservable<TSource> source)
  28. {
  29. if (source == null) throw new ArgumentNullException("source");
  30. return RunAsync(source, CancellationToken.None);
  31. }
  32. /// <summary>
  33. /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty.
  34. /// This operation subscribes to the observable sequence, making it hot.
  35. /// </summary>
  36. /// <param name="source">Source sequence to await.</param>
  37. /// <param name="cancellationToken">Cancellation token.</param>
  38. public static AsyncSubject<TSource> GetAwaiter<TSource>(this IObservable<TSource> source, CancellationToken cancellationToken)
  39. {
  40. if (source == null) throw new ArgumentNullException("source");
  41. return RunAsync(source, cancellationToken);
  42. }
  43. /// <summary>
  44. /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty.
  45. /// This operation subscribes to the observable sequence, making it hot.
  46. /// </summary>
  47. /// <param name="source">Source sequence to await.</param>
  48. /// <param name="cancellationToken">Cancellation token.</param>
  49. public static AsyncSubject<TSource> GetAwaiter<TSource>(this IConnectableObservable<TSource> source, CancellationToken cancellationToken)
  50. {
  51. if (source == null) throw new ArgumentNullException("source");
  52. return RunAsync(source, cancellationToken);
  53. }
  54. static AsyncSubject<TSource> RunAsync<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
  55. {
  56. var s = new AsyncSubject<TSource>();
  57. if (cancellationToken.IsCancellationRequested)
  58. {
  59. return Cancel(s, cancellationToken);
  60. }
  61. var d = source.Subscribe(s);
  62. if (cancellationToken.CanBeCanceled)
  63. {
  64. RegisterCancelation(s, d, cancellationToken);
  65. }
  66. return s;
  67. }
  68. static AsyncSubject<TSource> RunAsync<TSource>(IConnectableObservable<TSource> source, CancellationToken cancellationToken)
  69. {
  70. var s = new AsyncSubject<TSource>();
  71. if (cancellationToken.IsCancellationRequested)
  72. {
  73. return Cancel(s, cancellationToken);
  74. }
  75. var d = source.Subscribe(s);
  76. var c = source.Connect();
  77. if (cancellationToken.CanBeCanceled)
  78. {
  79. RegisterCancelation(s, StableCompositeDisposable.Create(d, c), cancellationToken);
  80. }
  81. return s;
  82. }
  83. static AsyncSubject<T> Cancel<T>(AsyncSubject<T> subject, CancellationToken cancellationToken)
  84. {
  85. subject.OnError(new OperationCanceledException(cancellationToken));
  86. return subject;
  87. }
  88. static void RegisterCancelation<T>(AsyncSubject<T> subject, IDisposable subscription, CancellationToken token)
  89. {
  90. //
  91. // Separate method used to avoid heap allocation of closure when no cancellation is needed,
  92. // e.g. when CancellationToken.None is provided to the RunAsync overloads.
  93. //
  94. var ctr = token.Register(() =>
  95. {
  96. subscription.Dispose();
  97. Cancel(subject, token);
  98. });
  99. //
  100. // No null-check for ctr is needed:
  101. //
  102. // - CancellationTokenRegistration is a struct
  103. // - Registration will succeed 99% of the time, no warranting an attempt to avoid spurious Subscribe calls
  104. //
  105. subject.Subscribe(Stubs<T>.Ignore, _ => ctr.Dispose(), ctr.Dispose);
  106. }
  107. }
  108. }
  109. #endif