Throttle.cs 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. namespace UniRx.Operators
  6. {
  7. internal class ThrottleObservable<T> : OperatorObservableBase<T>
  8. {
  9. readonly IObservable<T> source;
  10. readonly TimeSpan dueTime;
  11. readonly IScheduler scheduler;
  12. public ThrottleObservable(IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)
  13. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  14. {
  15. this.source = source;
  16. this.dueTime = dueTime;
  17. this.scheduler = scheduler;
  18. }
  19. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  20. {
  21. return new Throttle(this, observer, cancel).Run();
  22. }
  23. class Throttle : OperatorObserverBase<T, T>
  24. {
  25. readonly ThrottleObservable<T> parent;
  26. readonly object gate = new object();
  27. T latestValue = default(T);
  28. bool hasValue = false;
  29. SerialDisposable cancelable;
  30. ulong id = 0;
  31. public Throttle(ThrottleObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  32. {
  33. this.parent = parent;
  34. }
  35. public IDisposable Run()
  36. {
  37. cancelable = new SerialDisposable();
  38. var subscription = parent.source.Subscribe(this);
  39. return StableCompositeDisposable.Create(cancelable, subscription);
  40. }
  41. void OnNext(ulong currentid)
  42. {
  43. lock (gate)
  44. {
  45. if (hasValue && id == currentid)
  46. {
  47. observer.OnNext(latestValue);
  48. }
  49. hasValue = false;
  50. }
  51. }
  52. public override void OnNext(T value)
  53. {
  54. ulong currentid;
  55. lock (gate)
  56. {
  57. hasValue = true;
  58. latestValue = value;
  59. id = unchecked(id + 1);
  60. currentid = id;
  61. }
  62. var d = new SingleAssignmentDisposable();
  63. cancelable.Disposable = d;
  64. d.Disposable = parent.scheduler.Schedule(parent.dueTime, () => OnNext(currentid));
  65. }
  66. public override void OnError(Exception error)
  67. {
  68. cancelable.Dispose();
  69. lock (gate)
  70. {
  71. hasValue = false;
  72. id = unchecked(id + 1);
  73. try { observer.OnError(error); } finally { Dispose(); }
  74. }
  75. }
  76. public override void OnCompleted()
  77. {
  78. cancelable.Dispose();
  79. lock (gate)
  80. {
  81. if (hasValue)
  82. {
  83. observer.OnNext(latestValue);
  84. }
  85. hasValue = false;
  86. id = unchecked(id + 1);
  87. try { observer.OnCompleted(); } finally { Dispose(); }
  88. }
  89. }
  90. }
  91. }
  92. }