using System; namespace UniRx { public interface IConnectableObservable : IObservable { IDisposable Connect(); } public static partial class Observable { class ConnectableObservable : IConnectableObservable { readonly IObservable source; readonly ISubject subject; readonly object gate = new object(); Connection connection; public ConnectableObservable(IObservable source, ISubject subject) { this.source = source.AsObservable(); this.subject = subject; } public IDisposable Connect() { lock (gate) { // don't subscribe twice if (connection == null) { var subscription = source.Subscribe(subject); connection = new Connection(this, subscription); } return connection; } } public IDisposable Subscribe(IObserver observer) { return subject.Subscribe(observer); } class Connection : IDisposable { readonly ConnectableObservable parent; IDisposable subscription; public Connection(ConnectableObservable parent, IDisposable subscription) { this.parent = parent; this.subscription = subscription; } public void Dispose() { lock (parent.gate) { if (subscription != null) { subscription.Dispose(); subscription = null; parent.connection = null; } } } } } } }