using System; using System.Collections.Generic; using UniRx.Operators; namespace UniRx.Operators { internal class GroupedObservable : IGroupedObservable { readonly TKey key; readonly IObservable subject; readonly RefCountDisposable refCount; public TKey Key { get { return key; } } public GroupedObservable(TKey key, ISubject subject, RefCountDisposable refCount) { this.key = key; this.subject = subject; this.refCount = refCount; } public IDisposable Subscribe(IObserver observer) { var release = refCount.GetDisposable(); var subscription = subject.Subscribe(observer); return StableCompositeDisposable.Create(release, subscription); } } internal class GroupByObservable : OperatorObservableBase> { readonly IObservable source; readonly Func keySelector; readonly Func elementSelector; readonly int? capacity; readonly IEqualityComparer comparer; public GroupByObservable(IObservable source, Func keySelector, Func elementSelector, int? capacity, IEqualityComparer comparer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.keySelector = keySelector; this.elementSelector = elementSelector; this.capacity = capacity; this.comparer = comparer; } protected override IDisposable SubscribeCore(IObserver> observer, IDisposable cancel) { return new GroupBy(this, observer, cancel).Run(); } class GroupBy : OperatorObserverBase> { readonly GroupByObservable parent; readonly Dictionary> map; ISubject nullKeySubject; CompositeDisposable groupDisposable; RefCountDisposable refCountDisposable; public GroupBy(GroupByObservable parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; if (parent.capacity.HasValue) { map = new Dictionary>(parent.capacity.Value, parent.comparer); } else { map = new Dictionary>(parent.comparer); } } public IDisposable Run() { groupDisposable = new CompositeDisposable(); refCountDisposable = new RefCountDisposable(groupDisposable); groupDisposable.Add(parent.source.Subscribe(this)); return refCountDisposable; } public override void OnNext(TSource value) { var key = default(TKey); try { key = parent.keySelector(value); } catch (Exception exception) { Error(exception); return; } var fireNewMapEntry = false; var writer = default(ISubject); try { if (key == null) { if (nullKeySubject == null) { nullKeySubject = new Subject(); fireNewMapEntry = true; } writer = nullKeySubject; } else { if (!map.TryGetValue(key, out writer)) { writer = new Subject(); map.Add(key, writer); fireNewMapEntry = true; } } } catch (Exception exception) { Error(exception); return; } if (fireNewMapEntry) { var group = new GroupedObservable(key, writer, refCountDisposable); observer.OnNext(group); } var element = default(TElement); try { element = parent.elementSelector(value); } catch (Exception exception) { Error(exception); return; } writer.OnNext(element); } public override void OnError(Exception error) { Error(error); } public override void OnCompleted() { try { if (nullKeySubject != null) nullKeySubject.OnCompleted(); foreach (var s in map.Values) { s.OnCompleted(); } observer.OnCompleted(); } finally { Dispose(); } } void Error(Exception exception) { try { if (nullKeySubject != null) nullKeySubject.OnError(exception); foreach (var s in map.Values) { s.OnError(exception); } observer.OnError(exception); } finally { Dispose(); } } } } }