SubjectExtensions.cs 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. using System;
  2. namespace UniRx
  3. {
  4. public static class SubjectExtensions
  5. {
  6. public static ISubject<T> Synchronize<T>(this ISubject<T> subject)
  7. {
  8. return new AnonymousSubject<T>((subject as IObserver<T>).Synchronize(), subject);
  9. }
  10. public static ISubject<T> Synchronize<T>(this ISubject<T> subject, object gate)
  11. {
  12. return new AnonymousSubject<T>((subject as IObserver<T>).Synchronize(gate), subject);
  13. }
  14. class AnonymousSubject<T, U> : ISubject<T, U>
  15. {
  16. readonly IObserver<T> observer;
  17. readonly IObservable<U> observable;
  18. public AnonymousSubject(IObserver<T> observer, IObservable<U> observable)
  19. {
  20. this.observer = observer;
  21. this.observable = observable;
  22. }
  23. public void OnCompleted()
  24. {
  25. observer.OnCompleted();
  26. }
  27. public void OnError(Exception error)
  28. {
  29. if (error == null) throw new ArgumentNullException("error");
  30. observer.OnError(error);
  31. }
  32. public void OnNext(T value)
  33. {
  34. observer.OnNext(value);
  35. }
  36. public IDisposable Subscribe(IObserver<U> observer)
  37. {
  38. if (observer == null) throw new ArgumentNullException("observer");
  39. return observable.Subscribe(observer);
  40. }
  41. }
  42. class AnonymousSubject<T> : AnonymousSubject<T, T>, ISubject<T>
  43. {
  44. public AnonymousSubject(IObserver<T> observer, IObservable<T> observable)
  45. : base(observer, observable)
  46. {
  47. }
  48. }
  49. }
  50. }