ThrottleFirst.cs 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class ThrottleFirstObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly TimeSpan dueTime;
  8. readonly IScheduler scheduler;
  9. public ThrottleFirstObservable(IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)
  10. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.dueTime = dueTime;
  14. this.scheduler = scheduler;
  15. }
  16. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  17. {
  18. return new ThrottleFirst(this, observer, cancel).Run();
  19. }
  20. class ThrottleFirst : OperatorObserverBase<T, T>
  21. {
  22. readonly ThrottleFirstObservable<T> parent;
  23. readonly object gate = new object();
  24. bool open = true;
  25. SerialDisposable cancelable;
  26. public ThrottleFirst(ThrottleFirstObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  27. {
  28. this.parent = parent;
  29. }
  30. public IDisposable Run()
  31. {
  32. cancelable = new SerialDisposable();
  33. var subscription = parent.source.Subscribe(this);
  34. return StableCompositeDisposable.Create(cancelable, subscription);
  35. }
  36. void OnNext()
  37. {
  38. lock (gate)
  39. {
  40. open = true;
  41. }
  42. }
  43. public override void OnNext(T value)
  44. {
  45. lock (gate)
  46. {
  47. if (!open) return;
  48. observer.OnNext(value);
  49. open = false;
  50. }
  51. var d = new SingleAssignmentDisposable();
  52. cancelable.Disposable = d;
  53. d.Disposable = parent.scheduler.Schedule(parent.dueTime, OnNext);
  54. }
  55. public override void OnError(Exception error)
  56. {
  57. cancelable.Dispose();
  58. lock (gate)
  59. {
  60. try { observer.OnError(error); } finally { Dispose(); }
  61. }
  62. }
  63. public override void OnCompleted()
  64. {
  65. cancelable.Dispose();
  66. lock (gate)
  67. {
  68. try { observer.OnCompleted(); } finally { Dispose(); }
  69. }
  70. }
  71. }
  72. }
  73. }