ForEachAsync.cs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. using System;
  2. using UniRx.Operators;
  3. namespace UniRx.Operators
  4. {
  5. internal class ForEachAsyncObservable<T> : OperatorObservableBase<Unit>
  6. {
  7. readonly IObservable<T> source;
  8. readonly Action<T> onNext;
  9. readonly Action<T, int> onNextWithIndex;
  10. public ForEachAsyncObservable(IObservable<T> source, Action<T> onNext)
  11. : base(source.IsRequiredSubscribeOnCurrentThread())
  12. {
  13. this.source = source;
  14. this.onNext = onNext;
  15. }
  16. public ForEachAsyncObservable(IObservable<T> source, Action<T, int> onNext)
  17. : base(source.IsRequiredSubscribeOnCurrentThread())
  18. {
  19. this.source = source;
  20. this.onNextWithIndex = onNext;
  21. }
  22. protected override IDisposable SubscribeCore(IObserver<Unit> observer, IDisposable cancel)
  23. {
  24. if (onNext != null)
  25. {
  26. return source.Subscribe(new ForEachAsync(this, observer, cancel));
  27. }
  28. else
  29. {
  30. return source.Subscribe(new ForEachAsync_(this, observer, cancel));
  31. }
  32. }
  33. class ForEachAsync : OperatorObserverBase<T, Unit>
  34. {
  35. readonly ForEachAsyncObservable<T> parent;
  36. public ForEachAsync(ForEachAsyncObservable<T> parent, IObserver<Unit> observer, IDisposable cancel) : base(observer, cancel)
  37. {
  38. this.parent = parent;
  39. }
  40. public override void OnNext(T value)
  41. {
  42. try
  43. {
  44. parent.onNext(value);
  45. }
  46. catch (Exception ex)
  47. {
  48. try { observer.OnError(ex); }
  49. finally { Dispose(); }
  50. return;
  51. }
  52. }
  53. public override void OnError(Exception error)
  54. {
  55. try { observer.OnError(error); }
  56. finally { Dispose(); }
  57. }
  58. public override void OnCompleted()
  59. {
  60. observer.OnNext(Unit.Default);
  61. try { observer.OnCompleted(); }
  62. finally { Dispose(); }
  63. }
  64. }
  65. // with index
  66. class ForEachAsync_ : OperatorObserverBase<T, Unit>
  67. {
  68. readonly ForEachAsyncObservable<T> parent;
  69. int index = 0;
  70. public ForEachAsync_(ForEachAsyncObservable<T> parent, IObserver<Unit> observer, IDisposable cancel) : base(observer, cancel)
  71. {
  72. this.parent = parent;
  73. }
  74. public override void OnNext(T value)
  75. {
  76. try
  77. {
  78. parent.onNextWithIndex(value, index++);
  79. }
  80. catch (Exception ex)
  81. {
  82. try { observer.OnError(ex); }
  83. finally { Dispose(); }
  84. return;
  85. }
  86. }
  87. public override void OnError(Exception error)
  88. {
  89. try { observer.OnError(error); }
  90. finally { Dispose(); }
  91. }
  92. public override void OnCompleted()
  93. {
  94. observer.OnNext(Unit.Default);
  95. try { observer.OnCompleted(); }
  96. finally { Dispose(); }
  97. }
  98. }
  99. }
  100. }