dsi_ts_queue.hpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. /*
  2. This software is subject to the license described in the License.txt file
  3. included with this software distribution. You may not use this file except
  4. in compliance with this license.
  5. Copyright (c) Dynastream Innovations Inc. 2016
  6. All rights reserved.
  7. */
  8. #ifndef DSI_TS_QUEUE_HPP
  9. #define DSI_TS_QUEUE_HPP
  10. #include <list>
  11. #include <queue>
  12. #include <deque>
  13. //NOTE: Make sure nobody is still using this queue when it is being destroyed!
  14. template < class T, class Container = std::deque<T> >
  15. class TSQueue //thread-safe queue
  16. {
  17. public:
  18. TSQueue()
  19. {
  20. UCHAR ret;
  21. ret = DSIThread_CondInit(&stEventPush);
  22. if(ret != DSI_THREAD_ENONE)
  23. throw; //!!Need to throw something!
  24. ret = DSIThread_MutexInit(&stMutex);
  25. if(ret != DSI_THREAD_ENONE)
  26. {
  27. DSIThread_CondDestroy(&stEventPush);
  28. throw; //!!Need to throw something!
  29. }
  30. return;
  31. }
  32. ~TSQueue()
  33. {
  34. DSIThread_MutexDestroy(&stMutex);
  35. DSIThread_CondDestroy(&stEventPush);
  36. return;
  37. }
  38. void Push(const T& tElement_)
  39. {
  40. DSIThread_MutexLock(&stMutex);
  41. {
  42. clQueue.push(tElement_);
  43. DSIThread_CondSignal(&stEventPush);
  44. }
  45. DSIThread_MutexUnlock(&stMutex);
  46. return;
  47. }
  48. void PushArray(T* const ptElementArray_, ULONG ulSize_)
  49. {
  50. //!!Need to handle when ptElementArray_ == NULL
  51. DSIThread_MutexLock(&stMutex);
  52. {
  53. for(ULONG i=0; i<ulSize_; i++) //!!would it be faster if we assigned or used an iterator instead of pushing?
  54. clQueue.push(ptElementArray_[i]);
  55. DSIThread_CondSignal(&stEventPush);
  56. }
  57. DSIThread_MutexUnlock(&stMutex);
  58. return;
  59. }
  60. BOOL Pop(T& tElement_, ULONG ulWaitTime_ = 0)
  61. {
  62. DSIThread_MutexLock(&stMutex);
  63. {
  64. if(clQueue.empty() == TRUE)
  65. {
  66. UCHAR ret;
  67. ret = DSIThread_CondTimedWait(&stEventPush, &stMutex, ulWaitTime_);
  68. if(ret != DSI_THREAD_ENONE)
  69. {
  70. DSIThread_MutexUnlock(&stMutex);
  71. return FALSE;
  72. }
  73. }
  74. tElement_ = clQueue.front();
  75. clQueue.pop();
  76. }
  77. DSIThread_MutexUnlock(&stMutex);
  78. return TRUE;
  79. }
  80. ULONG PopArray(T* const ptElementArray_, ULONG ulMaxSize_, ULONG ulWaitTime_ = 0)
  81. {
  82. //!!Need to handle when ptElementArray_ == NULL
  83. ULONG ulFinalSize;
  84. DSIThread_MutexLock(&stMutex);
  85. {
  86. if(clQueue.empty() == TRUE)
  87. {
  88. UCHAR ret;
  89. ret = DSIThread_CondTimedWait(&stEventPush, &stMutex, ulWaitTime_);
  90. if(ret != DSI_THREAD_ENONE)
  91. {
  92. DSIThread_MutexUnlock(&stMutex);
  93. return 0;
  94. }
  95. }
  96. size_t QueueSize = clQueue.size();
  97. if (QueueSize > MAX_ULONG)
  98. QueueSize = MAX_ULONG;
  99. ULONG ulQueueSize = (ULONG)QueueSize;
  100. ULONG ulSize = (ulMaxSize_ < ulQueueSize) ? ulMaxSize_ : ulQueueSize; //MIN(ulMaxSize_, ulQueueSize);
  101. for(ULONG i=0; i<ulSize; i++)
  102. {
  103. ptElementArray_[i] = clQueue.front();
  104. clQueue.pop();
  105. }
  106. ulFinalSize = ulSize;
  107. }
  108. DSIThread_MutexUnlock(&stMutex);
  109. return ulFinalSize;
  110. }
  111. private:
  112. DSI_CONDITION_VAR stEventPush;
  113. DSI_MUTEX stMutex;
  114. std::queue<T, Container> clQueue;
  115. };
  116. #endif //DSI_TS_QUEUE_HPP