OfType.cs 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class OfTypeObservable<TSource, TResult> : OperatorObservableBase<TResult>
  5. {
  6. readonly IObservable<TSource> source;
  7. public OfTypeObservable(IObservable<TSource> source)
  8. : base(source.IsRequiredSubscribeOnCurrentThread())
  9. {
  10. this.source = source;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  13. {
  14. return source.Subscribe(new OfType(observer, cancel));
  15. }
  16. class OfType : OperatorObserverBase<TSource, TResult>
  17. {
  18. public OfType(IObserver<TResult> observer, IDisposable cancel)
  19. : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(TSource value)
  23. {
  24. if (value is TResult)
  25. {
  26. var castValue = (TResult)(object)value;
  27. observer.OnNext(castValue);
  28. }
  29. }
  30. public override void OnError(Exception error)
  31. {
  32. try { observer.OnError(error); } finally { Dispose(); }
  33. }
  34. public override void OnCompleted()
  35. {
  36. try { observer.OnCompleted(); } finally { Dispose(); }
  37. }
  38. }
  39. }
  40. }