SkipUntil.cs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class SkipUntilObservable<T, TOther> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly IObservable<TOther> other;
  8. public SkipUntilObservable(IObservable<T> source, IObservable<TOther> other)
  9. : base(source.IsRequiredSubscribeOnCurrentThread() || other.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.other = other;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  15. {
  16. return new SkipUntilOuterObserver(this, observer, cancel).Run();
  17. }
  18. class SkipUntilOuterObserver : OperatorObserverBase<T, T>
  19. {
  20. readonly SkipUntilObservable<T, TOther> parent;
  21. public SkipUntilOuterObserver(SkipUntilObservable<T, TOther> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  22. {
  23. this.parent = parent;
  24. }
  25. public IDisposable Run()
  26. {
  27. var sourceSubscription = new SingleAssignmentDisposable();
  28. var sourceObserver = new SkipUntil(this, sourceSubscription);
  29. var otherSubscription = new SingleAssignmentDisposable();
  30. var otherObserver = new SkipUntilOther(this, sourceObserver, otherSubscription);
  31. sourceSubscription.Disposable = parent.source.Subscribe(sourceObserver);
  32. otherSubscription.Disposable = parent.other.Subscribe(otherObserver);
  33. return StableCompositeDisposable.Create(otherSubscription, sourceSubscription);
  34. }
  35. public override void OnNext(T value)
  36. {
  37. }
  38. public override void OnError(Exception error)
  39. {
  40. }
  41. public override void OnCompleted()
  42. {
  43. }
  44. class SkipUntil : IObserver<T>
  45. {
  46. public volatile IObserver<T> observer;
  47. readonly SkipUntilOuterObserver parent;
  48. readonly IDisposable subscription;
  49. public SkipUntil(SkipUntilOuterObserver parent, IDisposable subscription)
  50. {
  51. this.parent = parent;
  52. observer = UniRx.InternalUtil.EmptyObserver<T>.Instance;
  53. this.subscription = subscription;
  54. }
  55. public void OnNext(T value)
  56. {
  57. observer.OnNext(value);
  58. }
  59. public void OnError(Exception error)
  60. {
  61. try { observer.OnError(error); }
  62. finally { parent.Dispose(); }
  63. }
  64. public void OnCompleted()
  65. {
  66. try { observer.OnCompleted(); }
  67. finally { subscription.Dispose(); }
  68. }
  69. }
  70. class SkipUntilOther : IObserver<TOther>
  71. {
  72. readonly SkipUntilOuterObserver parent;
  73. readonly SkipUntil sourceObserver;
  74. readonly IDisposable subscription;
  75. public SkipUntilOther(SkipUntilOuterObserver parent, SkipUntil sourceObserver, IDisposable subscription)
  76. {
  77. this.parent = parent;
  78. this.sourceObserver = sourceObserver;
  79. this.subscription = subscription;
  80. }
  81. public void OnNext(TOther value)
  82. {
  83. sourceObserver.observer = parent.observer;
  84. subscription.Dispose();
  85. }
  86. public void OnError(Exception error)
  87. {
  88. try { parent.observer.OnError(error); } finally { parent.Dispose(); }
  89. }
  90. public void OnCompleted()
  91. {
  92. subscription.Dispose();
  93. }
  94. }
  95. }
  96. }
  97. }