TakeLast.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. using System;
  2. using System.Collections.Generic;
  3. using UniRx.Operators;
  4. namespace UniRx.Operators
  5. {
  6. internal class TakeLastObservable<T> : OperatorObservableBase<T>
  7. {
  8. readonly IObservable<T> source;
  9. // count
  10. readonly int count;
  11. // duration
  12. readonly TimeSpan duration;
  13. readonly IScheduler scheduler;
  14. public TakeLastObservable(IObservable<T> source, int count)
  15. : base(source.IsRequiredSubscribeOnCurrentThread())
  16. {
  17. this.source = source;
  18. this.count = count;
  19. }
  20. public TakeLastObservable(IObservable<T> source, TimeSpan duration, IScheduler scheduler)
  21. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  22. {
  23. this.source = source;
  24. this.duration = duration;
  25. this.scheduler = scheduler;
  26. }
  27. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  28. {
  29. if (scheduler == null)
  30. {
  31. return new TakeLast(this, observer, cancel).Run();
  32. }
  33. else
  34. {
  35. return new TakeLast_(this, observer, cancel).Run();
  36. }
  37. }
  38. // count
  39. class TakeLast : OperatorObserverBase<T, T>
  40. {
  41. readonly TakeLastObservable<T> parent;
  42. readonly Queue<T> q;
  43. public TakeLast(TakeLastObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  44. {
  45. this.parent = parent;
  46. this.q = new Queue<T>();
  47. }
  48. public IDisposable Run()
  49. {
  50. return parent.source.Subscribe(this);
  51. }
  52. public override void OnNext(T value)
  53. {
  54. q.Enqueue(value);
  55. if (q.Count > parent.count)
  56. {
  57. q.Dequeue();
  58. }
  59. }
  60. public override void OnError(Exception error)
  61. {
  62. try { observer.OnError(error); } finally { Dispose(); }
  63. }
  64. public override void OnCompleted()
  65. {
  66. foreach (var item in q)
  67. {
  68. observer.OnNext(item);
  69. }
  70. try { observer.OnCompleted(); } finally { Dispose(); }
  71. }
  72. }
  73. // time
  74. class TakeLast_ : OperatorObserverBase<T, T>
  75. {
  76. DateTimeOffset startTime;
  77. readonly TakeLastObservable<T> parent;
  78. readonly Queue<TimeInterval<T>> q;
  79. public TakeLast_(TakeLastObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  80. {
  81. this.parent = parent;
  82. this.q = new Queue<TimeInterval<T>>();
  83. }
  84. public IDisposable Run()
  85. {
  86. startTime = parent.scheduler.Now;
  87. return parent.source.Subscribe(this);
  88. }
  89. public override void OnNext(T value)
  90. {
  91. var now = parent.scheduler.Now;
  92. var elapsed = now - startTime;
  93. q.Enqueue(new TimeInterval<T>(value, elapsed));
  94. Trim(elapsed);
  95. }
  96. public override void OnError(Exception error)
  97. {
  98. try { observer.OnError(error); } finally { Dispose(); };
  99. }
  100. public override void OnCompleted()
  101. {
  102. var now = parent.scheduler.Now;
  103. var elapsed = now - startTime;
  104. Trim(elapsed);
  105. foreach (var item in q)
  106. {
  107. observer.OnNext(item.Value);
  108. }
  109. try { observer.OnCompleted(); } finally { Dispose(); };
  110. }
  111. void Trim(TimeSpan now)
  112. {
  113. while (q.Count > 0 && now - q.Peek().Interval >= parent.duration)
  114. {
  115. q.Dequeue();
  116. }
  117. }
  118. }
  119. }
  120. }