Wait.cs 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. using System;
  2. using UniRx.InternalUtil;
  3. namespace UniRx.Operators
  4. {
  5. internal class Wait<T> : IObserver<T>
  6. {
  7. static readonly TimeSpan InfiniteTimeSpan = new TimeSpan(0, 0, 0, 0, -1); // from .NET 4.5
  8. readonly IObservable<T> source;
  9. readonly TimeSpan timeout;
  10. System.Threading.ManualResetEvent semaphore;
  11. bool seenValue = false;
  12. T value = default(T);
  13. Exception ex = default(Exception);
  14. public Wait(IObservable<T> source, TimeSpan timeout)
  15. {
  16. this.source = source;
  17. this.timeout = timeout;
  18. }
  19. public T Run()
  20. {
  21. semaphore = new System.Threading.ManualResetEvent(false);
  22. using (source.Subscribe(this))
  23. {
  24. var waitComplete = (timeout == InfiniteTimeSpan)
  25. ? semaphore.WaitOne()
  26. : semaphore.WaitOne(timeout);
  27. if (!waitComplete)
  28. {
  29. throw new TimeoutException("OnCompleted not fired.");
  30. }
  31. }
  32. if (ex != null) ex.Throw();
  33. if (!seenValue) throw new InvalidOperationException("No Elements.");
  34. return value;
  35. }
  36. public void OnNext(T value)
  37. {
  38. seenValue = true;
  39. this.value = value;
  40. }
  41. public void OnError(Exception error)
  42. {
  43. this.ex = error;
  44. semaphore.Set();
  45. }
  46. public void OnCompleted()
  47. {
  48. semaphore.Set();
  49. }
  50. }
  51. }