2
0

Select.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal interface ISelect<TR>
  5. {
  6. // IObservable<TR2> CombineSelector<TR2>(Func<TR, TR2> selector);
  7. IObservable<TR> CombinePredicate(Func<TR, bool> predicate);
  8. }
  9. internal class SelectObservable<T, TR> : OperatorObservableBase<TR>, ISelect<TR>
  10. {
  11. readonly IObservable<T> source;
  12. readonly Func<T, TR> selector;
  13. readonly Func<T, int, TR> selectorWithIndex;
  14. public SelectObservable(IObservable<T> source, Func<T, TR> selector)
  15. : base(source.IsRequiredSubscribeOnCurrentThread())
  16. {
  17. this.source = source;
  18. this.selector = selector;
  19. }
  20. public SelectObservable(IObservable<T> source, Func<T, int, TR> selector)
  21. : base(source.IsRequiredSubscribeOnCurrentThread())
  22. {
  23. this.source = source;
  24. this.selectorWithIndex = selector;
  25. }
  26. // sometimes cause "which no ahead of time (AOT) code was generated." on IL2CPP...
  27. //public IObservable<TR2> CombineSelector<TR2>(Func<TR, TR2> combineSelector)
  28. //{
  29. // if (this.selector != null)
  30. // {
  31. // return new Select<T, TR2>(source, x => combineSelector(this.selector(x)));
  32. // }
  33. // else
  34. // {
  35. // return new Select<TR, TR2>(this, combineSelector);
  36. // }
  37. //}
  38. public IObservable<TR> CombinePredicate(Func<TR, bool> predicate)
  39. {
  40. if (this.selector != null)
  41. {
  42. return new SelectWhereObservable<T, TR>(this.source, this.selector, predicate);
  43. }
  44. else
  45. {
  46. return new WhereObservable<TR>(this, predicate); // can't combine
  47. }
  48. }
  49. protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel)
  50. {
  51. if (selector != null)
  52. {
  53. return source.Subscribe(new Select(this, observer, cancel));
  54. }
  55. else
  56. {
  57. return source.Subscribe(new Select_(this, observer, cancel));
  58. }
  59. }
  60. class Select : OperatorObserverBase<T, TR>
  61. {
  62. readonly SelectObservable<T, TR> parent;
  63. public Select(SelectObservable<T, TR> parent, IObserver<TR> observer, IDisposable cancel)
  64. : base(observer, cancel)
  65. {
  66. this.parent = parent;
  67. }
  68. public override void OnNext(T value)
  69. {
  70. var v = default(TR);
  71. try
  72. {
  73. v = parent.selector(value);
  74. }
  75. catch (Exception ex)
  76. {
  77. try { observer.OnError(ex); } finally { Dispose(); }
  78. return;
  79. }
  80. observer.OnNext(v);
  81. }
  82. public override void OnError(Exception error)
  83. {
  84. try { observer.OnError(error); } finally { Dispose(); }
  85. }
  86. public override void OnCompleted()
  87. {
  88. try { observer.OnCompleted(); } finally { Dispose(); }
  89. }
  90. }
  91. // with Index
  92. class Select_ : OperatorObserverBase<T, TR>
  93. {
  94. readonly SelectObservable<T, TR> parent;
  95. int index;
  96. public Select_(SelectObservable<T, TR> parent, IObserver<TR> observer, IDisposable cancel)
  97. : base(observer, cancel)
  98. {
  99. this.parent = parent;
  100. this.index = 0;
  101. }
  102. public override void OnNext(T value)
  103. {
  104. var v = default(TR);
  105. try
  106. {
  107. v = parent.selectorWithIndex(value, index++);
  108. }
  109. catch (Exception ex)
  110. {
  111. try { observer.OnError(ex); } finally { Dispose(); }
  112. return;
  113. }
  114. observer.OnNext(v);
  115. }
  116. public override void OnError(Exception error)
  117. {
  118. try { observer.OnError(error); } finally { Dispose(); }
  119. }
  120. public override void OnCompleted()
  121. {
  122. try { observer.OnCompleted(); } finally { Dispose(); }
  123. }
  124. }
  125. }
  126. }