Browse Source

Optimizing thread pool

logicp 5 years ago
parent
commit
e11256ff04
5 changed files with 150 additions and 39 deletions
  1. 75 6
      headers/task_queue.hpp
  2. 0 1
      headers/thread_pool.hpp
  3. 0 1
      src/socket_listener.cpp
  4. 75 31
      src/task_queue.cpp
  5. 0 0
      src/thread_pool.cpp

+ 75 - 6
headers/task_queue.hpp

@@ -5,26 +5,95 @@
 #include <queue>
 #include <thread>
 
+/**
+ * TaskQueue
+ *
+ * A task queue employs a pool of worker threads who can execute arbitrary tasks
+ * @class
+ */
 class TaskQueue {
  public:
+ /**
+ * @constructor
+ * Nothing fancy
+ */
   TaskQueue();
+/**
+ * @destructor
+ * Make sure all worker threads detach or join before destroying TaskQueue
+ * instance
+ */
+  ~TaskQueue();
+
+/** PUBLIC METHODS **/
+
+/**
+ * initialize
+ *
+ * To be called after an instance of TaskQueue is created.
+ * @method
+ */
   void initialize();
-  void notifyPool();
+/**
+ * pushToQueue
+ *
+ * Add a task to the queue
+ *
+ * @method
+ * @param[in] {std::function<void()>} A fully encapsulated template function
+ * with its own internal state
+ */
   void pushToQueue(std::function<void()> fn);
 
  private:
-  void handleLoop();
-  void tasksReady();
-  void done();
+
+/** PRIVATE METHODS **/
+/**
+ * workerLoop
+ *
+ * The loop is the essential lifecycle of the worker
+ * @method
+ */
+  void workerLoop();
+/**
+ * deployWorkers
+ *
+ * Procures workers and sets them into existing by executing the workerLoop
+ * function
+ * @method
+ */
+  void deployWorkers();
+/**
+ * detachThreads
+ *
+ * Allows threads to terminate.
+ * @method
+ * @cleanup
+ */
   void detachThreads();
 
+/** PRIVATE MEMBERS **/
+
+  /**
+   * FIFO queue of templated function pointers
+   */
   std::queue<std::function<void()>> m_task_queue;
 
-  // ThreadPool m_thread_pool;
+  /**
+   * vector of worker threads
+   */
   std::vector<std::thread> m_thread_pool;
-  std::thread m_loop_thread;
 
+  /**
+   * mutex for locking resources
+   */
   std::mutex m_mutex_lock;
+  /**
+   * condition variable for controlling work execution
+   */
   std::condition_variable pool_condition;
+  /**
+   * atomic boolean to ensure queue is handled in a thread-safe manner
+   */
   std::atomic<bool> accepting_tasks;
 };

+ 0 - 1
headers/thread_pool.hpp

@@ -1 +0,0 @@
-

+ 0 - 1
src/socket_listener.cpp

@@ -157,7 +157,6 @@ void SocketListener::run() {
             std::bind(&SocketListener::handleClientSocket, this,
                       client_socket_fd, message_handler,
                       std::forward<std::shared_ptr<char[]>>(s_buffer_ptr)));
-        u_task_queue_ptr->notifyPool();
       }
     }
   }

+ 75 - 31
src/task_queue.cpp

@@ -2,62 +2,106 @@
 
 #include <iostream>
 
+/**
+ * @global
+ * {int} num_threads A rough estimate as to the number of threads we can run
+ * concurrently
+ */
 int num_threads = std::thread::hardware_concurrency();
 
+/**
+ * @constructor
+ * Nothing fancy
+ */
 TaskQueue::TaskQueue() {}
+/**
+ * @destructor
+ * Make sure all worker threads detach or join before destroying TaskQueue
+ * instance
+ */
+TaskQueue::~TaskQueue() { detachThreads(); }
 
+/**
+ * pushToQueue
+ *
+ * Add a task to the queue
+ *
+ * @method
+ * @param[in] {std::function<void()>} A fully encapsulated template function
+ * with its own internal state
+ */
 void TaskQueue::pushToQueue(std::function<void()> fn) {
-  std::unique_lock<std::mutex> lock(m_mutex_lock);
-  m_task_queue.push(fn);
+  std::unique_lock<std::mutex> lock(m_mutex_lock);  // obtain mutex
+  m_task_queue.push(fn);                            // add work to queue
   lock.unlock();
-  pool_condition.notify_one();
+  pool_condition.notify_one();  // one worker can begin waiting to perform work
 }
 
-void TaskQueue::handleLoop() {
+/**
+ * workerLoop
+ *
+ * The loop is the essential lifecycle of the worker
+ * @method
+ */
+void TaskQueue::workerLoop() {
   std::function<void()> fn;
   for (;;) {
     {  // encapsulate atomic management of queue
-      std::unique_lock<std::mutex> lock(m_mutex_lock);
-      pool_condition.wait(
-          lock, [this]() { return !accepting_tasks || !m_task_queue.empty(); });
+      std::unique_lock<std::mutex> lock(m_mutex_lock);  // obtain mutex
+      pool_condition
+          .wait(  // condition: not accepting tasks or queue is not empty
+              lock,
+              [this]() { return !accepting_tasks || !m_task_queue.empty(); });
       std::cout << "Wait condition met" << std::endl;
       if (!accepting_tasks && m_task_queue.empty()) {
-        return;  // we are done here
+        // If the queue is empty, it's safe to begin accepting tasks
+        accepting_tasks = true;
+        continue;
       }
       std::cout << "Taking task" << std::endl;
-      fn = m_task_queue.front();
+      fn = m_task_queue.front();  // obtain task from FIFO container
       m_task_queue.pop();
-    }      // queue management complete
-    fn();  // work
+      accepting_tasks = true;  // begin accepting before lock expires
+    }                          // queue management complete (lock expires)
+    fn();                      // work
   }
 }
 
-void TaskQueue::done() {
-  std::unique_lock<std::mutex> lock(m_mutex_lock);
-  accepting_tasks = false;
+/**
+ * deployWorkers
+ *
+ * Procures workers and sets them into existing by executing the workerLoop
+ * function
+ * @method
+ */
+void TaskQueue::deployWorkers() {
+  for (int i = 0; i < (num_threads - 1); i++) {
+    m_thread_pool.push_back(std::thread([this]() { workerLoop(); }));
+  }
+  // TODO: mutex may not be necessary, as accepting_tasks is atomic
+  std::unique_lock<std::mutex> lock(m_mutex_lock);  // obtain mutex
+  accepting_tasks = false;  // allow pool wait condition to be met
   lock.unlock();
   // when we send the notification immediately, the consumer will try to get the
   // lock , so unlock asap
   pool_condition.notify_all();
-}
-
-void TaskQueue::notifyPool() {
-  for (int i = 0; i < m_task_queue.size() && i < (num_threads - 1); i++) {
-    m_thread_pool.push_back(std::thread([this]() { handleLoop(); }));
-  }
-  done();
-  std::this_thread::sleep_for(std::chrono::milliseconds(400));
-  detachThreads();
-  size_t task_num = m_task_queue.size();
-  std::cout << "Task num: " << task_num << std::endl;
-  accepting_tasks = true;
-}
+}  // lock expires
 
-void TaskQueue::initialize() {
-  /* m_loop_thread = std::thread([this]() { loopCheck(); }); */
-  /* m_loop_thread.detach(); */
-}
+/**
+ * initialize
+ *
+ * To be called after an instance of TaskQueue is created.
+ * @method
+ */
+void TaskQueue::initialize() { deployWorkers(); }
 
+/**
+ * detachThreads
+ *
+ * Allows threads to terminate.
+ * @method
+ * @cleanup
+ */
 void TaskQueue::detachThreads() {
   for (std::thread& t : m_thread_pool) {
     if (t.joinable()) {

+ 0 - 0
src/thread_pool.cpp