using System; using System.Collections.Generic; namespace UniRx.Operators { internal class DistinctObservable : OperatorObservableBase { readonly IObservable source; readonly IEqualityComparer comparer; public DistinctObservable(IObservable source, IEqualityComparer comparer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.comparer = comparer; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return source.Subscribe(new Distinct(this, observer, cancel)); } class Distinct : OperatorObserverBase { readonly HashSet hashSet; public Distinct(DistinctObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { hashSet = (parent.comparer == null) ? new HashSet() : new HashSet(parent.comparer); } public override void OnNext(T value) { var key = default(T); var isAdded = false; try { key = value; isAdded = hashSet.Add(key); } catch (Exception exception) { try { observer.OnError(exception); } finally { Dispose(); } return; } if (isAdded) { observer.OnNext(value); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } internal class DistinctObservable : OperatorObservableBase { readonly IObservable source; readonly IEqualityComparer comparer; readonly Func keySelector; public DistinctObservable(IObservable source, Func keySelector, IEqualityComparer comparer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.comparer = comparer; this.keySelector = keySelector; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { return source.Subscribe(new Distinct(this, observer, cancel)); } class Distinct : OperatorObserverBase { readonly DistinctObservable parent; readonly HashSet hashSet; public Distinct(DistinctObservable parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; hashSet = (parent.comparer == null) ? new HashSet() : new HashSet(parent.comparer); } public override void OnNext(T value) { var key = default(TKey); var isAdded = false; try { key = parent.keySelector(value); isAdded = hashSet.Add(key); } catch (Exception exception) { try { observer.OnError(exception); } finally { Dispose(); } return; } if (isAdded) { observer.OnNext(value); } } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } } } }