FromCoroutine.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. using System;
  2. using System.Collections;
  3. using System.Threading;
  4. namespace UniRx.Operators
  5. {
  6. internal class FromCoroutineObservable<T> : OperatorObservableBase<T>
  7. {
  8. readonly Func<IObserver<T>, CancellationToken, IEnumerator> coroutine;
  9. public FromCoroutineObservable(Func<IObserver<T>, CancellationToken, IEnumerator> coroutine)
  10. : base(false)
  11. {
  12. this.coroutine = coroutine;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  15. {
  16. var fromCoroutineObserver = new FromCoroutine(observer, cancel);
  17. #if (NETFX_CORE || NET_4_6 || NET_STANDARD_2_0 || UNITY_WSA_10_0)
  18. var moreCancel = new CancellationDisposable();
  19. var token = moreCancel.Token;
  20. #else
  21. var moreCancel = new BooleanDisposable();
  22. var token = new CancellationToken(moreCancel);
  23. #endif
  24. MainThreadDispatcher.SendStartCoroutine(coroutine(fromCoroutineObserver, token));
  25. return moreCancel;
  26. }
  27. class FromCoroutine : OperatorObserverBase<T, T>
  28. {
  29. public FromCoroutine(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  30. {
  31. }
  32. public override void OnNext(T value)
  33. {
  34. try
  35. {
  36. base.observer.OnNext(value);
  37. }
  38. catch
  39. {
  40. Dispose();
  41. throw;
  42. }
  43. }
  44. public override void OnError(Exception error)
  45. {
  46. try { observer.OnError(error); }
  47. finally { Dispose(); }
  48. }
  49. public override void OnCompleted()
  50. {
  51. try { observer.OnCompleted(); }
  52. finally { Dispose(); }
  53. }
  54. }
  55. }
  56. internal class FromMicroCoroutineObservable<T> : OperatorObservableBase<T>
  57. {
  58. readonly Func<IObserver<T>, CancellationToken, IEnumerator> coroutine;
  59. readonly FrameCountType frameCountType;
  60. public FromMicroCoroutineObservable(Func<IObserver<T>, CancellationToken, IEnumerator> coroutine, FrameCountType frameCountType)
  61. : base(false)
  62. {
  63. this.coroutine = coroutine;
  64. this.frameCountType = frameCountType;
  65. }
  66. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  67. {
  68. var microCoroutineObserver = new FromMicroCoroutine(observer, cancel);
  69. #if (NETFX_CORE || NET_4_6 || NET_STANDARD_2_0 || UNITY_WSA_10_0)
  70. var moreCancel = new CancellationDisposable();
  71. var token = moreCancel.Token;
  72. #else
  73. var moreCancel = new BooleanDisposable();
  74. var token = new CancellationToken(moreCancel);
  75. #endif
  76. switch (frameCountType)
  77. {
  78. case FrameCountType.Update:
  79. MainThreadDispatcher.StartUpdateMicroCoroutine(coroutine(microCoroutineObserver, token));
  80. break;
  81. case FrameCountType.FixedUpdate:
  82. MainThreadDispatcher.StartFixedUpdateMicroCoroutine(coroutine(microCoroutineObserver, token));
  83. break;
  84. case FrameCountType.EndOfFrame:
  85. MainThreadDispatcher.StartEndOfFrameMicroCoroutine(coroutine(microCoroutineObserver, token));
  86. break;
  87. default:
  88. throw new ArgumentException("Invalid FrameCountType:" + frameCountType);
  89. }
  90. return moreCancel;
  91. }
  92. class FromMicroCoroutine : OperatorObserverBase<T, T>
  93. {
  94. public FromMicroCoroutine(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  95. {
  96. }
  97. public override void OnNext(T value)
  98. {
  99. try
  100. {
  101. base.observer.OnNext(value);
  102. }
  103. catch
  104. {
  105. Dispose();
  106. throw;
  107. }
  108. }
  109. public override void OnError(Exception error)
  110. {
  111. try { observer.OnError(error); }
  112. finally { Dispose(); }
  113. }
  114. public override void OnCompleted()
  115. {
  116. try { observer.OnCompleted(); }
  117. finally { Dispose(); }
  118. }
  119. }
  120. }
  121. }