ThreadSafeQueueWorker.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. using System;
  2. namespace UniRx.InternalUtil
  3. {
  4. public class ThreadSafeQueueWorker
  5. {
  6. const int MaxArrayLength = 0X7FEFFFFF;
  7. const int InitialSize = 16;
  8. object gate = new object();
  9. bool dequing = false;
  10. int actionListCount = 0;
  11. Action<object>[] actionList = new Action<object>[InitialSize];
  12. object[] actionStates = new object[InitialSize];
  13. int waitingListCount = 0;
  14. Action<object>[] waitingList = new Action<object>[InitialSize];
  15. object[] waitingStates = new object[InitialSize];
  16. public void Enqueue(Action<object> action, object state)
  17. {
  18. lock (gate)
  19. {
  20. if (dequing)
  21. {
  22. // Ensure Capacity
  23. if (waitingList.Length == waitingListCount)
  24. {
  25. var newLength = waitingListCount * 2;
  26. if ((uint)newLength > MaxArrayLength) newLength = MaxArrayLength;
  27. var newArray = new Action<object>[newLength];
  28. var newArrayState = new object[newLength];
  29. Array.Copy(waitingList, newArray, waitingListCount);
  30. Array.Copy(waitingStates, newArrayState, waitingListCount);
  31. waitingList = newArray;
  32. waitingStates = newArrayState;
  33. }
  34. waitingList[waitingListCount] = action;
  35. waitingStates[waitingListCount] = state;
  36. waitingListCount++;
  37. }
  38. else
  39. {
  40. // Ensure Capacity
  41. if (actionList.Length == actionListCount)
  42. {
  43. var newLength = actionListCount * 2;
  44. if ((uint)newLength > MaxArrayLength) newLength = MaxArrayLength;
  45. var newArray = new Action<object>[newLength];
  46. var newArrayState = new object[newLength];
  47. Array.Copy(actionList, newArray, actionListCount);
  48. Array.Copy(actionStates, newArrayState, actionListCount);
  49. actionList = newArray;
  50. actionStates = newArrayState;
  51. }
  52. actionList[actionListCount] = action;
  53. actionStates[actionListCount] = state;
  54. actionListCount++;
  55. }
  56. }
  57. }
  58. public void ExecuteAll(Action<Exception> unhandledExceptionCallback)
  59. {
  60. lock (gate)
  61. {
  62. if (actionListCount == 0) return;
  63. dequing = true;
  64. }
  65. for (int i = 0; i < actionListCount; i++)
  66. {
  67. var action = actionList[i];
  68. var state = actionStates[i];
  69. try
  70. {
  71. action(state);
  72. }
  73. catch (Exception ex)
  74. {
  75. unhandledExceptionCallback(ex);
  76. }
  77. finally
  78. {
  79. // Clear
  80. actionList[i] = null;
  81. actionStates[i] = null;
  82. }
  83. }
  84. lock (gate)
  85. {
  86. dequing = false;
  87. var swapTempActionList = actionList;
  88. var swapTempActionStates = actionStates;
  89. actionListCount = waitingListCount;
  90. actionList = waitingList;
  91. actionStates = waitingStates;
  92. waitingListCount = 0;
  93. waitingList = swapTempActionList;
  94. waitingStates = swapTempActionStates;
  95. }
  96. }
  97. }
  98. }