1
0

PairWise.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class PairwiseObservable<T, TR> : OperatorObservableBase<TR>
  5. {
  6. readonly IObservable<T> source;
  7. readonly Func<T, T, TR> selector;
  8. public PairwiseObservable(IObservable<T> source, Func<T, T, TR> selector)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.selector = selector;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  15. {
  16. return source.Subscribe(new Pairwise(this, observer, cancel));
  17. }
  18. class Pairwise : OperatorObserverBase<T, TR>
  19. {
  20. readonly PairwiseObservable<T, TR> parent;
  21. T prev = default(T);
  22. bool isFirst = true;
  23. public Pairwise(PairwiseObservable<T, TR> parent, IObserver<TR> observer, IDisposable cancel)
  24. : base(observer, cancel)
  25. {
  26. this.parent = parent;
  27. }
  28. public override void OnNext(T value)
  29. {
  30. if (isFirst)
  31. {
  32. isFirst = false;
  33. prev = value;
  34. return;
  35. }
  36. TR v;
  37. try
  38. {
  39. v = parent.selector(prev, value);
  40. prev = value;
  41. }
  42. catch (Exception ex)
  43. {
  44. try { observer.OnError(ex); } finally { Dispose(); }
  45. return;
  46. }
  47. observer.OnNext(v);
  48. }
  49. public override void OnError(Exception error)
  50. {
  51. try { observer.OnError(error); } finally { Dispose(); }
  52. }
  53. public override void OnCompleted()
  54. {
  55. try { observer.OnCompleted(); } finally { Dispose(); }
  56. }
  57. }
  58. }
  59. internal class PairwiseObservable<T> : OperatorObservableBase<Pair<T>>
  60. {
  61. readonly IObservable<T> source;
  62. public PairwiseObservable(IObservable<T> source)
  63. : base(source.IsRequiredSubscribeOnCurrentThread())
  64. {
  65. this.source = source;
  66. }
  67. protected override IDisposable SubscribeCore(IObserver<Pair<T>> observer, IDisposable cancel)
  68. {
  69. return source.Subscribe(new Pairwise(observer, cancel));
  70. }
  71. class Pairwise : OperatorObserverBase<T, Pair<T>>
  72. {
  73. T prev = default(T);
  74. bool isFirst = true;
  75. public Pairwise(IObserver<Pair<T>> observer, IDisposable cancel)
  76. : base(observer, cancel)
  77. {
  78. }
  79. public override void OnNext(T value)
  80. {
  81. if (isFirst)
  82. {
  83. isFirst = false;
  84. prev = value;
  85. return;
  86. }
  87. var pair = new Pair<T>(prev, value);
  88. prev = value;
  89. observer.OnNext(pair);
  90. }
  91. public override void OnError(Exception error)
  92. {
  93. try { observer.OnError(error); } finally { Dispose(); }
  94. }
  95. public override void OnCompleted()
  96. {
  97. try { observer.OnCompleted(); } finally { Dispose(); }
  98. }
  99. }
  100. }
  101. }