Scan.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class ScanObservable<TSource> : OperatorObservableBase<TSource>
  6. {
  7. readonly IObservable<TSource> source;
  8. readonly Func<TSource, TSource, TSource> accumulator;
  9. public ScanObservable(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  10. : base(source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.accumulator = accumulator;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<TSource> observer, IDisposable cancel)
  16. {
  17. return source.Subscribe(new Scan(this, observer, cancel));
  18. }
  19. class Scan : OperatorObserverBase<TSource, TSource>
  20. {
  21. readonly ScanObservable<TSource> parent;
  22. TSource accumulation;
  23. bool isFirst;
  24. public Scan(ScanObservable<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel)
  25. {
  26. this.parent = parent;
  27. this.isFirst = true;
  28. }
  29. public override void OnNext(TSource value)
  30. {
  31. if (isFirst)
  32. {
  33. isFirst = false;
  34. accumulation = value;
  35. }
  36. else
  37. {
  38. try
  39. {
  40. accumulation = parent.accumulator(accumulation, value);
  41. }
  42. catch (Exception ex)
  43. {
  44. try { observer.OnError(ex); }
  45. finally { Dispose(); }
  46. return;
  47. }
  48. }
  49. observer.OnNext(accumulation);
  50. }
  51. public override void OnError(Exception error)
  52. {
  53. try { observer.OnError(error); }
  54. finally { Dispose(); }
  55. }
  56. public override void OnCompleted()
  57. {
  58. try { observer.OnCompleted(); }
  59. finally { Dispose(); }
  60. }
  61. }
  62. }
  63. internal class ScanObservable<TSource, TAccumulate> : OperatorObservableBase<TAccumulate>
  64. {
  65. readonly IObservable<TSource> source;
  66. readonly TAccumulate seed;
  67. readonly Func<TAccumulate, TSource, TAccumulate> accumulator;
  68. public ScanObservable(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  69. : base(source.IsRequiredSubscribeOnCurrentThread())
  70. {
  71. this.source = source;
  72. this.seed = seed;
  73. this.accumulator = accumulator;
  74. }
  75. protected override IDisposable SubscribeCore(IObserver<TAccumulate> observer, IDisposable cancel)
  76. {
  77. return source.Subscribe(new Scan(this, observer, cancel));
  78. }
  79. class Scan : OperatorObserverBase<TSource, TAccumulate>
  80. {
  81. readonly ScanObservable<TSource, TAccumulate> parent;
  82. TAccumulate accumulation;
  83. bool isFirst;
  84. public Scan(ScanObservable<TSource, TAccumulate> parent, IObserver<TAccumulate> observer, IDisposable cancel) : base(observer, cancel)
  85. {
  86. this.parent = parent;
  87. this.isFirst = true;
  88. }
  89. public override void OnNext(TSource value)
  90. {
  91. if (isFirst)
  92. {
  93. isFirst = false;
  94. accumulation = parent.seed;
  95. }
  96. try
  97. {
  98. accumulation = parent.accumulator(accumulation, value);
  99. }
  100. catch (Exception ex)
  101. {
  102. try { observer.OnError(ex); }
  103. finally { Dispose(); }
  104. return;
  105. }
  106. observer.OnNext(accumulation);
  107. }
  108. public override void OnError(Exception error)
  109. {
  110. try { observer.OnError(error); }
  111. finally { Dispose(); }
  112. }
  113. public override void OnCompleted()
  114. {
  115. try { observer.OnCompleted(); }
  116. finally { Dispose(); }
  117. }
  118. }
  119. }
  120. }