#if (NET_4_6 || NET_STANDARD_2_0) using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace UniRx { public static partial class Observable { /// /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty. /// This operation subscribes to the observable sequence, making it hot. /// /// Source sequence to await. public static AsyncSubject GetAwaiter(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return RunAsync(source, CancellationToken.None); } /// /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty. /// This operation subscribes to the observable sequence, making it hot. /// /// Source sequence to await. public static AsyncSubject GetAwaiter(this IConnectableObservable source) { if (source == null) throw new ArgumentNullException("source"); return RunAsync(source, CancellationToken.None); } /// /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty. /// This operation subscribes to the observable sequence, making it hot. /// /// Source sequence to await. /// Cancellation token. public static AsyncSubject GetAwaiter(this IObservable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return RunAsync(source, cancellationToken); } /// /// Gets an awaiter that returns the last value of the observable sequence or throws an exception if the sequence is empty. /// This operation subscribes to the observable sequence, making it hot. /// /// Source sequence to await. /// Cancellation token. public static AsyncSubject GetAwaiter(this IConnectableObservable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return RunAsync(source, cancellationToken); } static AsyncSubject RunAsync(IObservable source, CancellationToken cancellationToken) { var s = new AsyncSubject(); if (cancellationToken.IsCancellationRequested) { return Cancel(s, cancellationToken); } var d = source.Subscribe(s); if (cancellationToken.CanBeCanceled) { RegisterCancelation(s, d, cancellationToken); } return s; } static AsyncSubject RunAsync(IConnectableObservable source, CancellationToken cancellationToken) { var s = new AsyncSubject(); if (cancellationToken.IsCancellationRequested) { return Cancel(s, cancellationToken); } var d = source.Subscribe(s); var c = source.Connect(); if (cancellationToken.CanBeCanceled) { RegisterCancelation(s, StableCompositeDisposable.Create(d, c), cancellationToken); } return s; } static AsyncSubject Cancel(AsyncSubject subject, CancellationToken cancellationToken) { subject.OnError(new OperationCanceledException(cancellationToken)); return subject; } static void RegisterCancelation(AsyncSubject subject, IDisposable subscription, CancellationToken token) { // // Separate method used to avoid heap allocation of closure when no cancellation is needed, // e.g. when CancellationToken.None is provided to the RunAsync overloads. // var ctr = token.Register(() => { subscription.Dispose(); Cancel(subject, token); }); // // No null-check for ctr is needed: // // - CancellationTokenRegistration is a struct // - Registration will succeed 99% of the time, no warranting an attempt to avoid spurious Subscribe calls // subject.Subscribe(Stubs.Ignore, _ => ctr.Dispose(), ctr.Dispose); } } } #endif