Observable.Concurrency.cs 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using UniRx.Operators;
  5. namespace UniRx
  6. {
  7. public static partial class Observable
  8. {
  9. public static IObservable<T> Synchronize<T>(this IObservable<T> source)
  10. {
  11. return new SynchronizeObservable<T>(source, new object());
  12. }
  13. public static IObservable<T> Synchronize<T>(this IObservable<T> source, object gate)
  14. {
  15. return new SynchronizeObservable<T>(source, gate);
  16. }
  17. public static IObservable<T> ObserveOn<T>(this IObservable<T> source, IScheduler scheduler)
  18. {
  19. return new ObserveOnObservable<T>(source, scheduler);
  20. }
  21. public static IObservable<T> SubscribeOn<T>(this IObservable<T> source, IScheduler scheduler)
  22. {
  23. return new SubscribeOnObservable<T>(source, scheduler);
  24. }
  25. public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, TimeSpan dueTime)
  26. {
  27. return new DelaySubscriptionObservable<T>(source, dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations);
  28. }
  29. public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler)
  30. {
  31. return new DelaySubscriptionObservable<T>(source, dueTime, scheduler);
  32. }
  33. public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, DateTimeOffset dueTime)
  34. {
  35. return new DelaySubscriptionObservable<T>(source, dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations);
  36. }
  37. public static IObservable<T> DelaySubscription<T>(this IObservable<T> source, DateTimeOffset dueTime, IScheduler scheduler)
  38. {
  39. return new DelaySubscriptionObservable<T>(source, dueTime, scheduler);
  40. }
  41. public static IObservable<T> Amb<T>(params IObservable<T>[] sources)
  42. {
  43. return Amb((IEnumerable<IObservable<T>>)sources);
  44. }
  45. public static IObservable<T> Amb<T>(IEnumerable<IObservable<T>> sources)
  46. {
  47. var result = Observable.Never<T>();
  48. foreach (var item in sources)
  49. {
  50. var second = item;
  51. result = result.Amb(second);
  52. }
  53. return result;
  54. }
  55. public static IObservable<T> Amb<T>(this IObservable<T> source, IObservable<T> second)
  56. {
  57. return new AmbObservable<T>(source, second);
  58. }
  59. }
  60. }