ConnectableObservable.cs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. using System;
  2. namespace UniRx
  3. {
  4. public interface IConnectableObservable<T> : IObservable<T>
  5. {
  6. IDisposable Connect();
  7. }
  8. public static partial class Observable
  9. {
  10. class ConnectableObservable<T> : IConnectableObservable<T>
  11. {
  12. readonly IObservable<T> source;
  13. readonly ISubject<T> subject;
  14. readonly object gate = new object();
  15. Connection connection;
  16. public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
  17. {
  18. this.source = source.AsObservable();
  19. this.subject = subject;
  20. }
  21. public IDisposable Connect()
  22. {
  23. lock (gate)
  24. {
  25. // don't subscribe twice
  26. if (connection == null)
  27. {
  28. var subscription = source.Subscribe(subject);
  29. connection = new Connection(this, subscription);
  30. }
  31. return connection;
  32. }
  33. }
  34. public IDisposable Subscribe(IObserver<T> observer)
  35. {
  36. return subject.Subscribe(observer);
  37. }
  38. class Connection : IDisposable
  39. {
  40. readonly ConnectableObservable<T> parent;
  41. IDisposable subscription;
  42. public Connection(ConnectableObservable<T> parent, IDisposable subscription)
  43. {
  44. this.parent = parent;
  45. this.subscription = subscription;
  46. }
  47. public void Dispose()
  48. {
  49. lock (parent.gate)
  50. {
  51. if (subscription != null)
  52. {
  53. subscription.Dispose();
  54. subscription = null;
  55. parent.connection = null;
  56. }
  57. }
  58. }
  59. }
  60. }
  61. }
  62. }