WhereSelect.cs 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. // Optimize for .Where().Select()
  5. internal class WhereSelectObservable<T, TR> : OperatorObservableBase<TR>
  6. {
  7. readonly IObservable<T> source;
  8. readonly Func<T, bool> predicate;
  9. readonly Func<T, TR> selector;
  10. public WhereSelectObservable(IObservable<T> source, Func<T, bool> predicate, Func<T, TR> selector)
  11. : base(source.IsRequiredSubscribeOnCurrentThread())
  12. {
  13. this.source = source;
  14. this.predicate = predicate;
  15. this.selector = selector;
  16. }
  17. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  18. {
  19. return source.Subscribe(new WhereSelect(this, observer, cancel));
  20. }
  21. class WhereSelect : OperatorObserverBase<T, TR>
  22. {
  23. readonly WhereSelectObservable<T, TR> parent;
  24. public WhereSelect(WhereSelectObservable<T, TR> parent, IObserver<TR> observer, IDisposable cancel)
  25. : base(observer, cancel)
  26. {
  27. this.parent = parent;
  28. }
  29. public override void OnNext(T value)
  30. {
  31. var isPassed = false;
  32. try
  33. {
  34. isPassed = parent.predicate(value);
  35. }
  36. catch (Exception ex)
  37. {
  38. try { observer.OnError(ex); } finally { Dispose(); }
  39. return;
  40. }
  41. if (isPassed)
  42. {
  43. var v = default(TR);
  44. try
  45. {
  46. v = parent.selector(value);
  47. }
  48. catch (Exception ex)
  49. {
  50. try { observer.OnError(ex); } finally { Dispose(); }
  51. return;
  52. }
  53. observer.OnNext(v);
  54. }
  55. }
  56. public override void OnError(Exception error)
  57. {
  58. try { observer.OnError(error); } finally { Dispose(); }
  59. }
  60. public override void OnCompleted()
  61. {
  62. try { observer.OnCompleted(); } finally { Dispose(); }
  63. }
  64. }
  65. }
  66. }