SelectWhere.cs 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. // Optimize for .Select().Where()
  5. internal class SelectWhereObservable<T, TR> : OperatorObservableBase<TR>
  6. {
  7. readonly IObservable<T> source;
  8. readonly Func<T, TR> selector;
  9. readonly Func<TR, bool> predicate;
  10. public SelectWhereObservable(IObservable<T> source, Func<T, TR> selector, Func<TR, bool> predicate)
  11. : base(source.IsRequiredSubscribeOnCurrentThread())
  12. {
  13. this.source = source;
  14. this.selector = selector;
  15. this.predicate = predicate;
  16. }
  17. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  18. {
  19. return source.Subscribe(new SelectWhere(this, observer, cancel));
  20. }
  21. class SelectWhere : OperatorObserverBase<T, TR>
  22. {
  23. readonly SelectWhereObservable<T, TR> parent;
  24. public SelectWhere(SelectWhereObservable<T, TR> parent, IObserver<TR> observer, IDisposable cancel)
  25. : base(observer, cancel)
  26. {
  27. this.parent = parent;
  28. }
  29. public override void OnNext(T value)
  30. {
  31. var v = default(TR);
  32. try
  33. {
  34. v = parent.selector(value);
  35. }
  36. catch (Exception ex)
  37. {
  38. try { observer.OnError(ex); } finally { Dispose(); }
  39. return;
  40. }
  41. var isPassed = false;
  42. try
  43. {
  44. isPassed = parent.predicate(v);
  45. }
  46. catch (Exception ex)
  47. {
  48. try { observer.OnError(ex); } finally { Dispose(); }
  49. return;
  50. }
  51. if (isPassed)
  52. {
  53. observer.OnNext(v);
  54. }
  55. }
  56. public override void OnError(Exception error)
  57. {
  58. try { observer.OnError(error); } finally { Dispose(); }
  59. }
  60. public override void OnCompleted()
  61. {
  62. try { observer.OnCompleted(); } finally { Dispose(); }
  63. }
  64. }
  65. }
  66. }