2
0

Take.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class TakeObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. readonly int count;
  9. readonly TimeSpan duration;
  10. internal readonly IScheduler scheduler; // public for optimization check
  11. public TakeObservable(IObservable<T> source, int count)
  12. : base(source.IsRequiredSubscribeOnCurrentThread())
  13. {
  14. this.source = source;
  15. this.count = count;
  16. }
  17. public TakeObservable(IObservable<T> source, TimeSpan duration, IScheduler scheduler)
  18. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  19. {
  20. this.source = source;
  21. this.duration = duration;
  22. this.scheduler = scheduler;
  23. }
  24. // optimize combiner
  25. public IObservable<T> Combine(int count)
  26. {
  27. // xs = 6
  28. // xs.Take(5) = 5 | xs.Take(3) = 3
  29. // xs.Take(5).Take(3) = 3 | xs.Take(3).Take(5) = 3
  30. // use minimum one
  31. return (this.count <= count)
  32. ? this
  33. : new TakeObservable<T>(source, count);
  34. }
  35. public IObservable<T> Combine(TimeSpan duration)
  36. {
  37. // xs = 6s
  38. // xs.Take(5s) = 5s | xs.Take(3s) = 3s
  39. // xs.Take(5s).Take(3s) = 3s | xs.Take(3s).Take(5s) = 3s
  40. // use minimum one
  41. return (this.duration <= duration)
  42. ? this
  43. : new TakeObservable<T>(source, duration, scheduler);
  44. }
  45. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  46. {
  47. if (scheduler == null)
  48. {
  49. return source.Subscribe(new Take(this, observer, cancel));
  50. }
  51. else
  52. {
  53. return new Take_(this, observer, cancel).Run();
  54. }
  55. }
  56. class Take : OperatorObserverBase<T, T>
  57. {
  58. int rest;
  59. public Take(TakeObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  60. {
  61. this.rest = parent.count;
  62. }
  63. public override void OnNext(T value)
  64. {
  65. if (rest > 0)
  66. {
  67. rest -= 1;
  68. base.observer.OnNext(value);
  69. if (rest == 0)
  70. {
  71. try { observer.OnCompleted(); } finally { Dispose(); };
  72. }
  73. }
  74. }
  75. public override void OnError(Exception error)
  76. {
  77. try { observer.OnError(error); } finally { Dispose(); }
  78. }
  79. public override void OnCompleted()
  80. {
  81. try { observer.OnCompleted(); } finally { Dispose(); }
  82. }
  83. }
  84. class Take_ : OperatorObserverBase<T, T>
  85. {
  86. readonly TakeObservable<T> parent;
  87. readonly object gate = new object();
  88. public Take_(TakeObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  89. {
  90. this.parent = parent;
  91. }
  92. public IDisposable Run()
  93. {
  94. var d1 = parent.scheduler.Schedule(parent.duration, Tick);
  95. var d2 = parent.source.Subscribe(this);
  96. return StableCompositeDisposable.Create(d1, d2);
  97. }
  98. void Tick()
  99. {
  100. lock (gate)
  101. {
  102. try { observer.OnCompleted(); } finally { Dispose(); };
  103. }
  104. }
  105. public override void OnNext(T value)
  106. {
  107. lock (gate)
  108. {
  109. base.observer.OnNext(value);
  110. }
  111. }
  112. public override void OnError(Exception error)
  113. {
  114. lock (gate)
  115. {
  116. try { observer.OnError(error); } finally { Dispose(); };
  117. }
  118. }
  119. public override void OnCompleted()
  120. {
  121. lock (gate)
  122. {
  123. try { observer.OnCompleted(); } finally { Dispose(); };
  124. }
  125. }
  126. }
  127. }
  128. }