1
0

ContinueWith.cs 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class ContinueWithObservable<TSource, TResult> : OperatorObservableBase<TResult>
  5. {
  6. readonly IObservable<TSource> source;
  7. readonly Func<TSource, IObservable<TResult>> selector;
  8. public ContinueWithObservable(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. this.selector = selector;
  13. }
  14. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  15. {
  16. return new ContinueWith(this, observer, cancel).Run();
  17. }
  18. class ContinueWith : OperatorObserverBase<TSource, TResult>
  19. {
  20. readonly ContinueWithObservable<TSource, TResult> parent;
  21. readonly SerialDisposable serialDisposable = new SerialDisposable();
  22. bool seenValue;
  23. TSource lastValue;
  24. public ContinueWith(ContinueWithObservable<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel)
  25. {
  26. this.parent = parent;
  27. }
  28. public IDisposable Run()
  29. {
  30. var sourceDisposable = new SingleAssignmentDisposable();
  31. serialDisposable.Disposable = sourceDisposable;
  32. sourceDisposable.Disposable = parent.source.Subscribe(this);
  33. return serialDisposable;
  34. }
  35. public override void OnNext(TSource value)
  36. {
  37. this.seenValue = true;
  38. this.lastValue = value;
  39. }
  40. public override void OnError(Exception error)
  41. {
  42. try { observer.OnError(error); } finally { Dispose(); };
  43. }
  44. public override void OnCompleted()
  45. {
  46. if (seenValue)
  47. {
  48. try
  49. {
  50. var v = parent.selector(lastValue);
  51. // dispose source subscription
  52. serialDisposable.Disposable = v.Subscribe(observer);
  53. }
  54. catch (Exception error)
  55. {
  56. OnError(error);
  57. }
  58. }
  59. else
  60. {
  61. try { observer.OnCompleted(); } finally { Dispose(); };
  62. }
  63. }
  64. }
  65. }
  66. }