ToObservable.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class ToObservableObservable<T> : OperatorObservableBase<T>
  6. {
  7. readonly IEnumerable<T> source;
  8. readonly IScheduler scheduler;
  9. public ToObservableObservable(IEnumerable<T> source, IScheduler scheduler)
  10. : base(scheduler == Scheduler.CurrentThread)
  11. {
  12. this.source = source;
  13. this.scheduler = scheduler;
  14. }
  15. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  16. {
  17. return new ToObservable(this, observer, cancel).Run();
  18. }
  19. class ToObservable : OperatorObserverBase<T, T>
  20. {
  21. readonly ToObservableObservable<T> parent;
  22. public ToObservable(ToObservableObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  23. {
  24. this.parent = parent;
  25. }
  26. public IDisposable Run()
  27. {
  28. var e = default(IEnumerator<T>);
  29. try
  30. {
  31. e = parent.source.GetEnumerator();
  32. }
  33. catch (Exception exception)
  34. {
  35. OnError(exception);
  36. return Disposable.Empty;
  37. }
  38. if (parent.scheduler == Scheduler.Immediate)
  39. {
  40. while (true)
  41. {
  42. bool hasNext;
  43. var current = default(T);
  44. try
  45. {
  46. hasNext = e.MoveNext();
  47. if (hasNext) current = e.Current;
  48. }
  49. catch (Exception ex)
  50. {
  51. e.Dispose();
  52. try { observer.OnError(ex); }
  53. finally { Dispose(); }
  54. break;
  55. }
  56. if (hasNext)
  57. {
  58. observer.OnNext(current);
  59. }
  60. else
  61. {
  62. e.Dispose();
  63. try { observer.OnCompleted(); }
  64. finally { Dispose(); }
  65. break;
  66. }
  67. }
  68. return Disposable.Empty;
  69. }
  70. var flag = new SingleAssignmentDisposable();
  71. flag.Disposable = parent.scheduler.Schedule(self =>
  72. {
  73. if (flag.IsDisposed)
  74. {
  75. e.Dispose();
  76. return;
  77. }
  78. bool hasNext;
  79. var current = default(T);
  80. try
  81. {
  82. hasNext = e.MoveNext();
  83. if (hasNext) current = e.Current;
  84. }
  85. catch (Exception ex)
  86. {
  87. e.Dispose();
  88. try { observer.OnError(ex); }
  89. finally { Dispose(); }
  90. return;
  91. }
  92. if (hasNext)
  93. {
  94. observer.OnNext(current);
  95. self();
  96. }
  97. else
  98. {
  99. e.Dispose();
  100. try { observer.OnCompleted(); }
  101. finally { Dispose(); }
  102. }
  103. });
  104. return flag;
  105. }
  106. public override void OnNext(T value)
  107. {
  108. // do nothing
  109. }
  110. public override void OnError(Exception error)
  111. {
  112. try { observer.OnError(error); }
  113. finally { Dispose(); }
  114. }
  115. public override void OnCompleted()
  116. {
  117. try { observer.OnCompleted(); }
  118. finally { Dispose(); }
  119. }
  120. }
  121. }
  122. }