TakeUntil.cs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class TakeUntilObservable<T, TOther> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly IObservable<TOther> other;
  8. public TakeUntilObservable(IObservable<T> source, IObservable<TOther> other)
  9. : base(source.IsRequiredSubscribeOnCurrentThread() || other.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.other = other;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  15. {
  16. return new TakeUntil(this, observer, cancel).Run();
  17. }
  18. class TakeUntil : OperatorObserverBase<T, T>
  19. {
  20. readonly TakeUntilObservable<T, TOther> parent;
  21. object gate = new object();
  22. public TakeUntil(TakeUntilObservable<T, TOther> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  23. {
  24. this.parent = parent;
  25. }
  26. public IDisposable Run()
  27. {
  28. var otherSubscription = new SingleAssignmentDisposable();
  29. var otherObserver = new TakeUntilOther(this, otherSubscription);
  30. otherSubscription.Disposable = parent.other.Subscribe(otherObserver);
  31. var sourceSubscription = parent.source.Subscribe(this);
  32. return StableCompositeDisposable.Create(otherSubscription, sourceSubscription);
  33. }
  34. public override void OnNext(T value)
  35. {
  36. lock (gate)
  37. {
  38. observer.OnNext(value);
  39. }
  40. }
  41. public override void OnError(Exception error)
  42. {
  43. lock (gate)
  44. {
  45. try { observer.OnError(error); } finally { Dispose(); }
  46. }
  47. }
  48. public override void OnCompleted()
  49. {
  50. lock (gate)
  51. {
  52. try { observer.OnCompleted(); } finally { Dispose(); }
  53. }
  54. }
  55. class TakeUntilOther : IObserver<TOther>
  56. {
  57. readonly TakeUntil sourceObserver;
  58. readonly IDisposable subscription;
  59. public TakeUntilOther(TakeUntil sourceObserver, IDisposable subscription)
  60. {
  61. this.sourceObserver = sourceObserver;
  62. this.subscription = subscription;
  63. }
  64. public void OnNext(TOther value)
  65. {
  66. lock (sourceObserver.gate)
  67. {
  68. try
  69. {
  70. sourceObserver.observer.OnCompleted();
  71. }
  72. finally
  73. {
  74. sourceObserver.Dispose();
  75. subscription.Dispose();
  76. }
  77. }
  78. }
  79. public void OnError(Exception error)
  80. {
  81. lock (sourceObserver.gate)
  82. {
  83. try
  84. {
  85. sourceObserver.observer.OnError(error);
  86. }
  87. finally
  88. {
  89. sourceObserver.Dispose();
  90. subscription.Dispose();
  91. }
  92. }
  93. }
  94. public void OnCompleted()
  95. {
  96. lock (sourceObserver.gate)
  97. {
  98. try
  99. {
  100. sourceObserver.observer.OnCompleted();
  101. }
  102. finally
  103. {
  104. sourceObserver.Dispose();
  105. subscription.Dispose();
  106. }
  107. }
  108. }
  109. }
  110. }
  111. }
  112. }