2
0

StartWith.cs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class StartWithObservable<T> : OperatorObservableBase<T>
  5. {
  6. readonly IObservable<T> source;
  7. readonly T value;
  8. readonly Func<T> valueFactory;
  9. public StartWithObservable(IObservable<T> source, T value)
  10. : base(source.IsRequiredSubscribeOnCurrentThread())
  11. {
  12. this.source = source;
  13. this.value = value;
  14. }
  15. public StartWithObservable(IObservable<T> source, Func<T> valueFactory)
  16. : base(source.IsRequiredSubscribeOnCurrentThread())
  17. {
  18. this.source = source;
  19. this.valueFactory = valueFactory;
  20. }
  21. protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
  22. {
  23. return new StartWith(this, observer, cancel).Run();
  24. }
  25. class StartWith : OperatorObserverBase<T, T>
  26. {
  27. readonly StartWithObservable<T> parent;
  28. public StartWith(StartWithObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel)
  29. {
  30. this.parent = parent;
  31. }
  32. public IDisposable Run()
  33. {
  34. T t;
  35. if (parent.valueFactory == null)
  36. {
  37. t = parent.value;
  38. }
  39. else
  40. {
  41. try
  42. {
  43. t = parent.valueFactory();
  44. }
  45. catch (Exception ex)
  46. {
  47. try { observer.OnError(ex); }
  48. finally { Dispose(); }
  49. return Disposable.Empty;
  50. }
  51. }
  52. OnNext(t);
  53. return parent.source.Subscribe(base.observer); // good bye StartWithObserver
  54. }
  55. public override void OnNext(T value)
  56. {
  57. base.observer.OnNext(value);
  58. }
  59. public override void OnError(Exception error)
  60. {
  61. try { observer.OnError(error); }
  62. finally { Dispose(); }
  63. }
  64. public override void OnCompleted()
  65. {
  66. try { observer.OnCompleted(); }
  67. finally { Dispose(); }
  68. }
  69. }
  70. }
  71. }