1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- using System;
- namespace UniRx
- {
- public interface IConnectableObservable<T> : IObservable<T>
- {
- IDisposable Connect();
- }
- public static partial class Observable
- {
- class ConnectableObservable<T> : IConnectableObservable<T>
- {
- readonly IObservable<T> source;
- readonly ISubject<T> subject;
- readonly object gate = new object();
- Connection connection;
- public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
- {
- this.source = source.AsObservable();
- this.subject = subject;
- }
- public IDisposable Connect()
- {
- lock (gate)
- {
- // don't subscribe twice
- if (connection == null)
- {
- var subscription = source.Subscribe(subject);
- connection = new Connection(this, subscription);
- }
- return connection;
- }
- }
- public IDisposable Subscribe(IObserver<T> observer)
- {
- return subject.Subscribe(observer);
- }
- class Connection : IDisposable
- {
- readonly ConnectableObservable<T> parent;
- IDisposable subscription;
- public Connection(ConnectableObservable<T> parent, IDisposable subscription)
- {
- this.parent = parent;
- this.subscription = subscription;
- }
- public void Dispose()
- {
- lock (parent.gate)
- {
- if (subscription != null)
- {
- subscription.Dispose();
- subscription = null;
- parent.connection = null;
- }
- }
- }
- }
- }
- }
- }
|