WebRequestExtensions.cs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net;
  6. using System.Text;
  7. using System.Threading;
  8. namespace UniRx
  9. {
  10. public static class WebRequestExtensions
  11. {
  12. static IObservable<TResult> AbortableDeferredAsyncRequest<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end, WebRequest request)
  13. {
  14. var result = Observable.Create<TResult>(observer =>
  15. {
  16. var isCompleted = -1;
  17. var subscription = Observable.FromAsyncPattern<TResult>(begin,
  18. ar =>
  19. {
  20. try
  21. {
  22. Interlocked.Increment(ref isCompleted);
  23. return end(ar);
  24. }
  25. catch (WebException ex)
  26. {
  27. if (ex.Status == WebExceptionStatus.RequestCanceled) return default(TResult);
  28. throw;
  29. }
  30. })()
  31. .Subscribe(observer);
  32. return Disposable.Create(() =>
  33. {
  34. if (Interlocked.Increment(ref isCompleted) == 0)
  35. {
  36. subscription.Dispose();
  37. request.Abort();
  38. }
  39. });
  40. });
  41. return result;
  42. }
  43. public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
  44. {
  45. return AbortableDeferredAsyncRequest<WebResponse>(request.BeginGetResponse, request.EndGetResponse, request);
  46. }
  47. public static IObservable<HttpWebResponse> GetResponseAsObservable(this HttpWebRequest request)
  48. {
  49. return AbortableDeferredAsyncRequest<HttpWebResponse>(request.BeginGetResponse, ar => (HttpWebResponse)request.EndGetResponse(ar), request);
  50. }
  51. public static IObservable<Stream> GetRequestStreamAsObservable(this WebRequest request)
  52. {
  53. return AbortableDeferredAsyncRequest<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, request);
  54. }
  55. }
  56. }