using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; namespace UniRx { public static class WebRequestExtensions { static IObservable AbortableDeferredAsyncRequest(Func begin, Func end, WebRequest request) { var result = Observable.Create(observer => { var isCompleted = -1; var subscription = Observable.FromAsyncPattern(begin, ar => { try { Interlocked.Increment(ref isCompleted); return end(ar); } catch (WebException ex) { if (ex.Status == WebExceptionStatus.RequestCanceled) return default(TResult); throw; } })() .Subscribe(observer); return Disposable.Create(() => { if (Interlocked.Increment(ref isCompleted) == 0) { subscription.Dispose(); request.Abort(); } }); }); return result; } public static IObservable GetResponseAsObservable(this WebRequest request) { return AbortableDeferredAsyncRequest(request.BeginGetResponse, request.EndGetResponse, request); } public static IObservable GetResponseAsObservable(this HttpWebRequest request) { return AbortableDeferredAsyncRequest(request.BeginGetResponse, ar => (HttpWebResponse)request.EndGetResponse(ar), request); } public static IObservable GetRequestStreamAsObservable(this WebRequest request) { return AbortableDeferredAsyncRequest(request.BeginGetRequestStream, request.EndGetRequestStream, request); } } }