Timeout.cs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class TimeoutObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly TimeSpan? dueTime;
  8. readonly DateTimeOffset? dueTimeDT;
  9. readonly IScheduler scheduler;
  10. public TimeoutObservable(IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)
  11. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  12. {
  13. this.source = source;
  14. this.dueTime = dueTime;
  15. this.scheduler = scheduler;
  16. }
  17. public TimeoutObservable(IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler)
  18. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  19. {
  20. this.source = source;
  21. this.dueTimeDT = dueTime;
  22. this.scheduler = scheduler;
  23. }
  24. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  25. {
  26. if (dueTime != null)
  27. {
  28. return new Timeout(this, observer, cancel).Run();
  29. }
  30. else
  31. {
  32. return new Timeout_(this, observer, cancel).Run();
  33. }
  34. }
  35. class Timeout : OperatorObserverBase<T, T>
  36. {
  37. readonly TimeoutObservable<T> parent;
  38. readonly object gate = new object();
  39. ulong objectId = 0ul;
  40. bool isTimeout = false;
  41. SingleAssignmentDisposable sourceSubscription;
  42. SerialDisposable timerSubscription;
  43. public Timeout(TimeoutObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  44. {
  45. this.parent = parent;
  46. }
  47. public IDisposable Run()
  48. {
  49. sourceSubscription = new SingleAssignmentDisposable();
  50. timerSubscription = new SerialDisposable();
  51. timerSubscription.Disposable = RunTimer(objectId);
  52. sourceSubscription.Disposable = parent.source.Subscribe(this);
  53. return StableCompositeDisposable.Create(timerSubscription, sourceSubscription);
  54. }
  55. IDisposable RunTimer(ulong timerId)
  56. {
  57. return parent.scheduler.Schedule(parent.dueTime.Value, () =>
  58. {
  59. lock (gate)
  60. {
  61. if (objectId == timerId)
  62. {
  63. isTimeout = true;
  64. }
  65. }
  66. if (isTimeout)
  67. {
  68. try { observer.OnError(new TimeoutException()); } finally { Dispose(); }
  69. }
  70. });
  71. }
  72. public override void OnNext(T value)
  73. {
  74. ulong useObjectId;
  75. bool timeout;
  76. lock (gate)
  77. {
  78. timeout = isTimeout;
  79. objectId++;
  80. useObjectId = objectId;
  81. }
  82. if (timeout) return;
  83. timerSubscription.Disposable = Disposable.Empty; // cancel old timer
  84. observer.OnNext(value);
  85. timerSubscription.Disposable = RunTimer(useObjectId);
  86. }
  87. public override void OnError(Exception error)
  88. {
  89. bool timeout;
  90. lock (gate)
  91. {
  92. timeout = isTimeout;
  93. objectId++;
  94. }
  95. if (timeout) return;
  96. timerSubscription.Dispose();
  97. try { observer.OnError(error); } finally { Dispose(); }
  98. }
  99. public override void OnCompleted()
  100. {
  101. bool timeout;
  102. lock (gate)
  103. {
  104. timeout = isTimeout;
  105. objectId++;
  106. }
  107. if (timeout) return;
  108. timerSubscription.Dispose();
  109. try { observer.OnCompleted(); } finally { Dispose(); }
  110. }
  111. }
  112. class Timeout_ : OperatorObserverBase<T, T>
  113. {
  114. readonly TimeoutObservable<T> parent;
  115. readonly object gate = new object();
  116. bool isFinished = false;
  117. SingleAssignmentDisposable sourceSubscription;
  118. IDisposable timerSubscription;
  119. public Timeout_(TimeoutObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  120. {
  121. this.parent = parent;
  122. }
  123. public IDisposable Run()
  124. {
  125. sourceSubscription = new SingleAssignmentDisposable();
  126. timerSubscription = parent.scheduler.Schedule(parent.dueTimeDT.Value, OnNext);
  127. sourceSubscription.Disposable = parent.source.Subscribe(this);
  128. return StableCompositeDisposable.Create(timerSubscription, sourceSubscription);
  129. }
  130. // in timer
  131. void OnNext()
  132. {
  133. lock (gate)
  134. {
  135. if (isFinished) return;
  136. isFinished = true;
  137. }
  138. sourceSubscription.Dispose();
  139. try { observer.OnError(new TimeoutException()); } finally { Dispose(); }
  140. }
  141. public override void OnNext(T value)
  142. {
  143. lock (gate)
  144. {
  145. if (!isFinished) observer.OnNext(value);
  146. }
  147. }
  148. public override void OnError(Exception error)
  149. {
  150. lock (gate)
  151. {
  152. if (isFinished) return;
  153. isFinished = true;
  154. timerSubscription.Dispose();
  155. }
  156. try { observer.OnError(error); } finally { Dispose(); }
  157. }
  158. public override void OnCompleted()
  159. {
  160. lock (gate)
  161. {
  162. if (!isFinished)
  163. {
  164. isFinished = true;
  165. timerSubscription.Dispose();
  166. }
  167. try { observer.OnCompleted(); } finally { Dispose(); }
  168. }
  169. }
  170. }
  171. }
  172. }