Range.cs 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class RangeObservable : OperatorObservableBase<int>
  5. {
  6. readonly int start;
  7. readonly int count;
  8. readonly IScheduler scheduler;
  9. public RangeObservable(int start, int count, IScheduler scheduler)
  10. : base(scheduler == Scheduler.CurrentThread)
  11. {
  12. if (count < 0) throw new ArgumentOutOfRangeException("count < 0");
  13. this.start = start;
  14. this.count = count;
  15. this.scheduler = scheduler;
  16. }
  17. protected override IDisposable SubscribeCore(IObserver<int> observer, IDisposable cancel)
  18. {
  19. observer = new Range(observer, cancel);
  20. if (scheduler == Scheduler.Immediate)
  21. {
  22. for (int i = 0; i < count; i++)
  23. {
  24. int v = start + i;
  25. observer.OnNext(v);
  26. }
  27. observer.OnCompleted();
  28. return Disposable.Empty;
  29. }
  30. else
  31. {
  32. var i = 0;
  33. return scheduler.Schedule((Action self) =>
  34. {
  35. if (i < count)
  36. {
  37. int v = start + i;
  38. observer.OnNext(v);
  39. i++;
  40. self();
  41. }
  42. else
  43. {
  44. observer.OnCompleted();
  45. }
  46. });
  47. }
  48. }
  49. class Range : OperatorObserverBase<int, int>
  50. {
  51. public Range(IObserver<int> observer, IDisposable cancel)
  52. : base(observer, cancel)
  53. {
  54. }
  55. public override void OnNext(int value)
  56. {
  57. try
  58. {
  59. base.observer.OnNext(value);
  60. }
  61. catch
  62. {
  63. Dispose();
  64. throw;
  65. }
  66. }
  67. public override void OnError(Exception error)
  68. {
  69. try { observer.OnError(error); }
  70. finally { Dispose(); }
  71. }
  72. public override void OnCompleted()
  73. {
  74. try { observer.OnCompleted(); }
  75. finally { Dispose(); }
  76. }
  77. }
  78. }
  79. }