ToList.cs 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. using System;
  2. using System.Collections.Generic;
  3. namespace UniRx.Operators
  4. {
  5. internal class ToListObservable<TSource> : OperatorObservableBase<IList<TSource>>
  6. {
  7. readonly IObservable<TSource> source;
  8. public ToListObservable(IObservable<TSource> source)
  9. : base(source.IsRequiredSubscribeOnCurrentThread())
  10. {
  11. this.source = source;
  12. }
  13. protected override IDisposable SubscribeCore(IObserver<IList<TSource>> observer, IDisposable cancel)
  14. {
  15. return source.Subscribe(new ToList(observer, cancel));
  16. }
  17. class ToList : OperatorObserverBase<TSource, IList<TSource>>
  18. {
  19. readonly List<TSource> list = new List<TSource>();
  20. public ToList(IObserver<IList<TSource>> observer, IDisposable cancel)
  21. : base(observer, cancel)
  22. {
  23. }
  24. public override void OnNext(TSource value)
  25. {
  26. try
  27. {
  28. list.Add(value); // sometimes cause error on multithread
  29. }
  30. catch (Exception ex)
  31. {
  32. try { observer.OnError(ex); } finally { Dispose(); }
  33. return;
  34. }
  35. }
  36. public override void OnError(Exception error)
  37. {
  38. try { observer.OnError(error); } finally { Dispose(); }
  39. }
  40. public override void OnCompleted()
  41. {
  42. base.observer.OnNext(list);
  43. try { observer.OnCompleted(); } finally { Dispose(); };
  44. }
  45. }
  46. }
  47. }