GroupBy.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. using System;
  2. using System.Collections.Generic;
  3. using UniRx.Operators;
  4. namespace UniRx.Operators
  5. {
  6. internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
  7. {
  8. readonly TKey key;
  9. readonly IObservable<TElement> subject;
  10. readonly RefCountDisposable refCount;
  11. public TKey Key
  12. {
  13. get { return key; }
  14. }
  15. public GroupedObservable(TKey key, ISubject<TElement> subject, RefCountDisposable refCount)
  16. {
  17. this.key = key;
  18. this.subject = subject;
  19. this.refCount = refCount;
  20. }
  21. public IDisposable Subscribe(IObserver<TElement> observer)
  22. {
  23. var release = refCount.GetDisposable();
  24. var subscription = subject.Subscribe(observer);
  25. return StableCompositeDisposable.Create(release, subscription);
  26. }
  27. }
  28. internal class GroupByObservable<TSource, TKey, TElement> : OperatorObservableBase<IGroupedObservable<TKey, TElement>>
  29. {
  30. readonly IObservable<TSource> source;
  31. readonly Func<TSource, TKey> keySelector;
  32. readonly Func<TSource, TElement> elementSelector;
  33. readonly int? capacity;
  34. readonly IEqualityComparer<TKey> comparer;
  35. public GroupByObservable(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
  36. : base(source.IsRequiredSubscribeOnCurrentThread())
  37. {
  38. this.source = source;
  39. this.keySelector = keySelector;
  40. this.elementSelector = elementSelector;
  41. this.capacity = capacity;
  42. this.comparer = comparer;
  43. }
  44. protected override IDisposable SubscribeCore(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
  45. {
  46. return new GroupBy(this, observer, cancel).Run();
  47. }
  48. class GroupBy : OperatorObserverBase<TSource, IGroupedObservable<TKey, TElement>>
  49. {
  50. readonly GroupByObservable<TSource, TKey, TElement> parent;
  51. readonly Dictionary<TKey, ISubject<TElement>> map;
  52. ISubject<TElement> nullKeySubject;
  53. CompositeDisposable groupDisposable;
  54. RefCountDisposable refCountDisposable;
  55. public GroupBy(GroupByObservable<TSource, TKey, TElement> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
  56. : base(observer, cancel)
  57. {
  58. this.parent = parent;
  59. if (parent.capacity.HasValue)
  60. {
  61. map = new Dictionary<TKey, ISubject<TElement>>(parent.capacity.Value, parent.comparer);
  62. }
  63. else
  64. {
  65. map = new Dictionary<TKey, ISubject<TElement>>(parent.comparer);
  66. }
  67. }
  68. public IDisposable Run()
  69. {
  70. groupDisposable = new CompositeDisposable();
  71. refCountDisposable = new RefCountDisposable(groupDisposable);
  72. groupDisposable.Add(parent.source.Subscribe(this));
  73. return refCountDisposable;
  74. }
  75. public override void OnNext(TSource value)
  76. {
  77. var key = default(TKey);
  78. try
  79. {
  80. key = parent.keySelector(value);
  81. }
  82. catch (Exception exception)
  83. {
  84. Error(exception);
  85. return;
  86. }
  87. var fireNewMapEntry = false;
  88. var writer = default(ISubject<TElement>);
  89. try
  90. {
  91. if (key == null)
  92. {
  93. if (nullKeySubject == null)
  94. {
  95. nullKeySubject = new Subject<TElement>();
  96. fireNewMapEntry = true;
  97. }
  98. writer = nullKeySubject;
  99. }
  100. else
  101. {
  102. if (!map.TryGetValue(key, out writer))
  103. {
  104. writer = new Subject<TElement>();
  105. map.Add(key, writer);
  106. fireNewMapEntry = true;
  107. }
  108. }
  109. }
  110. catch (Exception exception)
  111. {
  112. Error(exception);
  113. return;
  114. }
  115. if (fireNewMapEntry)
  116. {
  117. var group = new GroupedObservable<TKey, TElement>(key, writer, refCountDisposable);
  118. observer.OnNext(group);
  119. }
  120. var element = default(TElement);
  121. try
  122. {
  123. element = parent.elementSelector(value);
  124. }
  125. catch (Exception exception)
  126. {
  127. Error(exception);
  128. return;
  129. }
  130. writer.OnNext(element);
  131. }
  132. public override void OnError(Exception error)
  133. {
  134. Error(error);
  135. }
  136. public override void OnCompleted()
  137. {
  138. try
  139. {
  140. if (nullKeySubject != null) nullKeySubject.OnCompleted();
  141. foreach (var s in map.Values)
  142. {
  143. s.OnCompleted();
  144. }
  145. observer.OnCompleted();
  146. }
  147. finally
  148. {
  149. Dispose();
  150. }
  151. }
  152. void Error(Exception exception)
  153. {
  154. try
  155. {
  156. if (nullKeySubject != null) nullKeySubject.OnError(exception);
  157. foreach (var s in map.Values)
  158. {
  159. s.OnError(exception);
  160. }
  161. observer.OnError(exception);
  162. }
  163. finally
  164. {
  165. Dispose();
  166. }
  167. }
  168. }
  169. }
  170. }