Create.cs 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class CreateObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly Func<IObserver<T>, IDisposable> subscribe;
  7. public CreateObservable(Func<IObserver<T>, IDisposable> subscribe)
  8. : base(true) // fail safe
  9. {
  10. this.subscribe = subscribe;
  11. }
  12. public CreateObservable(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
  13. : base(isRequiredSubscribeOnCurrentThread)
  14. {
  15. this.subscribe = subscribe;
  16. }
  17. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  18. {
  19. observer = new Create(observer, cancel);
  20. return subscribe(observer) ?? Disposable.Empty;
  21. }
  22. class Create : OperatorObserverBase<T, T>
  23. {
  24. public Create(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  25. {
  26. }
  27. public override void OnNext(T value)
  28. {
  29. base.observer.OnNext(value);
  30. }
  31. public override void OnError(Exception error)
  32. {
  33. try { observer.OnError(error); }
  34. finally { Dispose(); }
  35. }
  36. public override void OnCompleted()
  37. {
  38. try { observer.OnCompleted(); }
  39. finally { Dispose(); }
  40. }
  41. }
  42. }
  43. internal class CreateObservable<T, TState> : OperatorObservableBase<T>
  44. {
  45. readonly TState state;
  46. readonly Func<TState, IObserver<T>, IDisposable> subscribe;
  47. public CreateObservable(TState state, Func<TState, IObserver<T>, IDisposable> subscribe)
  48. : base(true) // fail safe
  49. {
  50. this.state = state;
  51. this.subscribe = subscribe;
  52. }
  53. public CreateObservable(TState state, Func<TState, IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
  54. : base(isRequiredSubscribeOnCurrentThread)
  55. {
  56. this.state = state;
  57. this.subscribe = subscribe;
  58. }
  59. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  60. {
  61. observer = new Create(observer, cancel);
  62. return subscribe(state, observer) ?? Disposable.Empty;
  63. }
  64. class Create : OperatorObserverBase<T, T>
  65. {
  66. public Create(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  67. {
  68. }
  69. public override void OnNext(T value)
  70. {
  71. base.observer.OnNext(value);
  72. }
  73. public override void OnError(Exception error)
  74. {
  75. try { observer.OnError(error); }
  76. finally { Dispose(); }
  77. }
  78. public override void OnCompleted()
  79. {
  80. try { observer.OnCompleted(); }
  81. finally { Dispose(); }
  82. }
  83. }
  84. }
  85. internal class CreateSafeObservable<T> : OperatorObservableBase<T>
  86. {
  87. readonly Func<IObserver<T>, IDisposable> subscribe;
  88. public CreateSafeObservable(Func<IObserver<T>, IDisposable> subscribe)
  89. : base(true) // fail safe
  90. {
  91. this.subscribe = subscribe;
  92. }
  93. public CreateSafeObservable(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
  94. : base(isRequiredSubscribeOnCurrentThread)
  95. {
  96. this.subscribe = subscribe;
  97. }
  98. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  99. {
  100. observer = new CreateSafe(observer, cancel);
  101. return subscribe(observer) ?? Disposable.Empty;
  102. }
  103. class CreateSafe : OperatorObserverBase<T, T>
  104. {
  105. public CreateSafe(IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  106. {
  107. }
  108. public override void OnNext(T value)
  109. {
  110. try
  111. {
  112. base.observer.OnNext(value);
  113. }
  114. catch
  115. {
  116. Dispose(); // safe
  117. throw;
  118. }
  119. }
  120. public override void OnError(Exception error)
  121. {
  122. try { observer.OnError(error); }
  123. finally { Dispose(); }
  124. }
  125. public override void OnCompleted()
  126. {
  127. try { observer.OnCompleted(); }
  128. finally { Dispose(); }
  129. }
  130. }
  131. }
  132. }