task_queue.cpp 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include <task_queue.hpp>
  2. #include <iostream>
  3. /**
  4. * @global
  5. * {int} num_threads A rough estimate as to the number of threads we can run
  6. * concurrently
  7. */
  8. int num_threads = std::thread::hardware_concurrency();
  9. /**
  10. * @constructor
  11. * Nothing fancy
  12. */
  13. TaskQueue::TaskQueue() {}
  14. /**
  15. * @destructor
  16. * Make sure all worker threads detach or join before destroying TaskQueue
  17. * instance
  18. */
  19. TaskQueue::~TaskQueue() { detachThreads(); }
  20. /**
  21. * pushToQueue
  22. *
  23. * Add a task to the queue
  24. *
  25. * @method
  26. * @param[in] {std::function<void()>} A fully encapsulated template function
  27. * with its own internal state
  28. */
  29. void TaskQueue::pushToQueue(std::function<void()> fn) {
  30. std::unique_lock<std::mutex> lock(m_mutex_lock); // obtain mutex
  31. m_task_queue.push(fn); // add work to queue
  32. lock.unlock();
  33. pool_condition.notify_one(); // one worker can begin waiting to perform work
  34. }
  35. /**
  36. * workerLoop
  37. *
  38. * The loop is the essential lifecycle of the worker
  39. * @method
  40. */
  41. void TaskQueue::workerLoop() {
  42. std::function<void()> fn;
  43. for (;;) {
  44. { // encapsulate atomic management of queue
  45. std::unique_lock<std::mutex> lock(m_mutex_lock); // obtain mutex
  46. pool_condition
  47. .wait( // condition: not accepting tasks or queue is not empty
  48. lock,
  49. [this]() { return !accepting_tasks || !m_task_queue.empty(); });
  50. std::cout << "Wait condition met" << std::endl;
  51. if (!accepting_tasks && m_task_queue.empty()) {
  52. // If the queue is empty, it's safe to begin accepting tasks
  53. accepting_tasks = true;
  54. continue;
  55. }
  56. std::cout << "Taking task" << std::endl;
  57. fn = m_task_queue.front(); // obtain task from FIFO container
  58. m_task_queue.pop();
  59. accepting_tasks = true; // begin accepting before lock expires
  60. } // queue management complete (lock expires)
  61. fn(); // work
  62. }
  63. }
  64. /**
  65. * deployWorkers
  66. *
  67. * Procures workers and sets them into existing by executing the workerLoop
  68. * function
  69. * @method
  70. */
  71. void TaskQueue::deployWorkers() {
  72. for (int i = 0; i < (num_threads - 1); i++) {
  73. m_thread_pool.push_back(std::thread([this]() { workerLoop(); }));
  74. }
  75. // TODO: mutex may not be necessary, as accepting_tasks is atomic
  76. std::unique_lock<std::mutex> lock(m_mutex_lock); // obtain mutex
  77. accepting_tasks = false; // allow pool wait condition to be met
  78. lock.unlock();
  79. // when we send the notification immediately, the consumer will try to get the
  80. // lock , so unlock asap
  81. pool_condition.notify_all();
  82. } // lock expires
  83. /**
  84. * initialize
  85. *
  86. * To be called after an instance of TaskQueue is created.
  87. * @method
  88. */
  89. void TaskQueue::initialize() { deployWorkers(); }
  90. /**
  91. * detachThreads
  92. *
  93. * Allows threads to terminate.
  94. * @method
  95. * @cleanup
  96. */
  97. void TaskQueue::detachThreads() {
  98. for (std::thread& t : m_thread_pool) {
  99. if (t.joinable()) {
  100. t.detach();
  101. }
  102. }
  103. }