Cast.cs 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. using System;
  2. namespace UniRx.Operators
  3. {
  4. internal class CastObservable<TSource, TResult> : OperatorObservableBase<TResult>
  5. {
  6. readonly IObservable<TSource> source;
  7. public CastObservable(IObservable<TSource> source)
  8. : base(source.IsRequiredSubscribeOnCurrentThread())
  9. {
  10. this.source = source;
  11. }
  12. protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel)
  13. {
  14. return source.Subscribe(new Cast(observer, cancel));
  15. }
  16. class Cast : OperatorObserverBase<TSource, TResult>
  17. {
  18. public Cast(IObserver<TResult> observer, IDisposable cancel)
  19. : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(TSource value)
  23. {
  24. var castValue = default(TResult);
  25. try
  26. {
  27. castValue = (TResult)(object)value;
  28. }
  29. catch (Exception ex)
  30. {
  31. try { observer.OnError(ex); }
  32. finally { Dispose(); }
  33. return;
  34. }
  35. observer.OnNext(castValue);
  36. }
  37. public override void OnError(Exception error)
  38. {
  39. try { observer.OnError(error); }
  40. finally { Dispose(); }
  41. }
  42. public override void OnCompleted()
  43. {
  44. try { observer.OnCompleted(); }
  45. finally { Dispose(); }
  46. }
  47. }
  48. }
  49. }