2
0

TimeInterval.cs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class TimeIntervalObservable<T> : OperatorObservableBase<UniRx.TimeInterval<T>>
  5. {
  6. readonly IObservable<T> source;
  7. readonly IScheduler scheduler;
  8. public TimeIntervalObservable(IObservable<T> source, IScheduler scheduler)
  9. : base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.scheduler = scheduler;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<UniRx.TimeInterval<T>> observer, IDisposable cancel)
  15. {
  16. return source.Subscribe(new TimeInterval(this, observer, cancel));
  17. }
  18. class TimeInterval : OperatorObserverBase<T, UniRx.TimeInterval<T>>
  19. {
  20. readonly TimeIntervalObservable<T> parent;
  21. DateTimeOffset lastTime;
  22. public TimeInterval(TimeIntervalObservable<T> parent, IObserver<UniRx.TimeInterval<T>> observer, IDisposable cancel)
  23. : base(observer, cancel)
  24. {
  25. this.parent = parent;
  26. this.lastTime = parent.scheduler.Now;
  27. }
  28. public override void OnNext(T value)
  29. {
  30. var now = parent.scheduler.Now;
  31. var span = now.Subtract(lastTime);
  32. lastTime = now;
  33. base.observer.OnNext(new UniRx.TimeInterval<T>(value, span));
  34. }
  35. public override void OnError(Exception error)
  36. {
  37. try { observer.OnError(error); }
  38. finally { Dispose(); }
  39. }
  40. public override void OnCompleted()
  41. {
  42. try { observer.OnCompleted(); }
  43. finally { Dispose(); }
  44. }
  45. }
  46. }
  47. }