ToArray.cs 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class ToArrayObservable<TSource> : OperatorObservableBase<TSource[]>
  6. {
  7. readonly IObservable<TSource> source;
  8. public ToArrayObservable(IObservable<TSource> source)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. }
  13. protected override IDisposable SubscribeCore(IObserver<TSource[]> observer, IDisposable cancel)
  14. {
  15. return source.Subscribe(new ToArray(observer, cancel));
  16. }
  17. class ToArray : OperatorObserverBase<TSource, TSource[]>
  18. {
  19. readonly List<TSource> list = new List<TSource>();
  20. public ToArray(IObserver<TSource[]> observer, IDisposable cancel)
  21. : base(observer, cancel)
  22. {
  23. }
  24. public override void OnNext(TSource value)
  25. {
  26. try
  27. {
  28. list.Add(value); // sometimes cause error on multithread
  29. }
  30. catch (Exception ex)
  31. {
  32. try { observer.OnError(ex); } finally { Dispose(); }
  33. return;
  34. }
  35. }
  36. public override void OnError(Exception error)
  37. {
  38. try { observer.OnError(error); } finally { Dispose(); }
  39. }
  40. public override void OnCompleted()
  41. {
  42. TSource[] result;
  43. try
  44. {
  45. result = list.ToArray();
  46. }
  47. catch (Exception ex)
  48. {
  49. try { observer.OnError(ex); } finally { Dispose(); }
  50. return;
  51. }
  52. base.observer.OnNext(result);
  53. try { observer.OnCompleted(); } finally { Dispose(); };
  54. }
  55. }
  56. }
  57. }