using System; namespace UniRx.Operators { internal class SubscribeOnMainThreadObservable : OperatorObservableBase { readonly IObservable source; readonly IObservable subscribeTrigger; public SubscribeOnMainThreadObservable(IObservable source, IObservable subscribeTrigger) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.subscribeTrigger = subscribeTrigger; } protected override IDisposable SubscribeCore(IObserver observer, IDisposable cancel) { var m = new SingleAssignmentDisposable(); var d = new SerialDisposable(); d.Disposable = m; m.Disposable = subscribeTrigger.SubscribeWithState3(observer, d, source, (_, o, disp, s) => { disp.Disposable = s.Subscribe(o); }); return d; } } }