1
0

DistinctUntilChanged.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class DistinctUntilChangedObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IObservable<T> source;
  8. readonly IEqualityComparer<T> comparer;
  9. public DistinctUntilChangedObservable(IObservable<T> source, IEqualityComparer<T> comparer)
  10. : base(source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.comparer = comparer;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  16. {
  17. return source.Subscribe(new DistinctUntilChanged(this, observer, cancel));
  18. }
  19. class DistinctUntilChanged : OperatorObserverBase<T, T>
  20. {
  21. readonly DistinctUntilChangedObservable<T> parent;
  22. bool isFirst = true;
  23. T prevKey = default(T);
  24. public DistinctUntilChanged(DistinctUntilChangedObservable<T> parent, IObserver<T> observer, IDisposable cancel)
  25. : base(observer, cancel)
  26. {
  27. this.parent = parent;
  28. }
  29. public override void OnNext(T value)
  30. {
  31. T currentKey;
  32. try
  33. {
  34. currentKey = value;
  35. }
  36. catch (Exception exception)
  37. {
  38. try { observer.OnError(exception); } finally { Dispose(); }
  39. return;
  40. }
  41. var sameKey = false;
  42. if (isFirst)
  43. {
  44. isFirst = false;
  45. }
  46. else
  47. {
  48. try
  49. {
  50. sameKey = parent.comparer.Equals(currentKey, prevKey);
  51. }
  52. catch (Exception ex)
  53. {
  54. try { observer.OnError(ex); } finally { Dispose(); }
  55. return;
  56. }
  57. }
  58. if (!sameKey)
  59. {
  60. prevKey = currentKey;
  61. observer.OnNext(value);
  62. }
  63. }
  64. public override void OnError(Exception error)
  65. {
  66. try { observer.OnError(error); } finally { Dispose(); }
  67. }
  68. public override void OnCompleted()
  69. {
  70. try { observer.OnCompleted(); } finally { Dispose(); }
  71. }
  72. }
  73. }
  74. internal class DistinctUntilChangedObservable<T, TKey> : OperatorObservableBase<T>
  75. {
  76. readonly IObservable<T> source;
  77. readonly IEqualityComparer<TKey> comparer;
  78. readonly Func<T, TKey> keySelector;
  79. public DistinctUntilChangedObservable(IObservable<T> source, Func<T, TKey> keySelector, IEqualityComparer<TKey> comparer)
  80. : base(source.IsRequiredSubscribeOnCurrentThread())
  81. {
  82. this.source = source;
  83. this.comparer = comparer;
  84. this.keySelector = keySelector;
  85. }
  86. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  87. {
  88. return source.Subscribe(new DistinctUntilChanged(this, observer, cancel));
  89. }
  90. class DistinctUntilChanged : OperatorObserverBase<T, T>
  91. {
  92. readonly DistinctUntilChangedObservable<T, TKey> parent;
  93. bool isFirst = true;
  94. TKey prevKey = default(TKey);
  95. public DistinctUntilChanged(DistinctUntilChangedObservable<T, TKey> parent, IObserver<T> observer, IDisposable cancel)
  96. : base(observer, cancel)
  97. {
  98. this.parent = parent;
  99. }
  100. public override void OnNext(T value)
  101. {
  102. TKey currentKey;
  103. try
  104. {
  105. currentKey = parent.keySelector(value);
  106. }
  107. catch (Exception exception)
  108. {
  109. try { observer.OnError(exception); } finally { Dispose(); }
  110. return;
  111. }
  112. var sameKey = false;
  113. if (isFirst)
  114. {
  115. isFirst = false;
  116. }
  117. else
  118. {
  119. try
  120. {
  121. sameKey = parent.comparer.Equals(currentKey, prevKey);
  122. }
  123. catch (Exception ex)
  124. {
  125. try { observer.OnError(ex); } finally { Dispose(); }
  126. return;
  127. }
  128. }
  129. if (!sameKey)
  130. {
  131. prevKey = currentKey;
  132. observer.OnNext(value);
  133. }
  134. }
  135. public override void OnError(Exception error)
  136. {
  137. try { observer.OnError(error); } finally { Dispose(); }
  138. }
  139. public override void OnCompleted()
  140. {
  141. try { observer.OnCompleted(); } finally { Dispose(); }
  142. }
  143. }
  144. }
  145. }