Browse Source

threadpool implementation seems to be working

logicp 5 years ago
parent
commit
f2437f0c53
1 changed files with 44 additions and 13 deletions
  1. 44 13
      socket_listener.cpp

+ 44 - 13
socket_listener.cpp

@@ -2,7 +2,6 @@
 #include "headers/socket_listener.h"
 
 #include "headers/constants.h"
-#include "headers/listen_interface.h"
 // System libraries
 #include <arpa/inet.h>
 #include <netdb.h>
@@ -12,6 +11,7 @@
 #include <unistd.h>
 // C++ Libraries
 #include <atomic>
+#include <chrono>
 #include <condition_variable>
 #include <functional>
 #include <iostream>
@@ -21,6 +21,8 @@
 #include <thread>
 #include <vector>
 
+int num_threads = std::thread::hardware_concurrency();
+
 /**
  * Constructor
  * Initialize with ip_address, port and message_handler
@@ -71,15 +73,21 @@ void SocketListener::push_to_queue(std::function<void()> fn) {
 }
 
 void SocketListener::handle_loop() {
+  std::string accepting_str = accepting_tasks == 0
+                                  ? std::string("Not accepting tasks")
+                                  : std::string("Accepting tasks");
+  std::cout << accepting_str << std::endl;
   std::function<void()> fn;
   for (;;) {
     {
       std::unique_lock<std::mutex> lock(m_mutex_lock);
       pool_condition.wait(
           lock, [this]() { return !accepting_tasks || !task_queue.empty(); });
+      std::cout << "Wait condition met" << std::endl;
       if (!accepting_tasks && task_queue.empty()) {
         return;  // we are done here
       }
+      std::cout << "Taking task" << std::endl;
       fn = task_queue.front();
       task_queue.pop();
     }
@@ -88,18 +96,29 @@ void SocketListener::handle_loop() {
 }
 
 void SocketListener::loop_check() {
-  if (!m_loop_switch) {
-    m_loop_switch = true;
-    m_loop_thread = std::thread(&SocketListener::handle_loop, this);
-  }
+    for (int i = 0; i < task_queue.size() && i < (num_threads - 1); i++) {
+      thread_pool.push_back(std::thread([this]() { handle_loop(); }));
+    }
+    done();
+    std::this_thread::sleep_for(std::chrono::milliseconds(400));
+    detachThreads();
+    size_t task_num = task_queue.size();
+    std::cout << "Task num: " << task_num << std::endl;
+    accepting_tasks = true;
+}
+
+void SocketListener::done() {
+  std::unique_lock<std::mutex> lock(m_mutex_lock);
+  accepting_tasks = false;
+  lock.unlock();
+  // when we send the notification immediately, the consumer will try to get the
+  // lock , so unlock asap
+  pool_condition.notify_all();
 }
 
 void SocketListener::handle_client_socket(
     int client_socket_fd, SocketListener::MessageHandler message_handler,
     std::shared_ptr<char[]> buf) {
-  // char buf[MAX_BUFFER_SIZE] =
-  //     {};  // Declare, define and initialize a character buffer
-  // std::string buffer_string{};  // Initialize a string buffer
   while (true) {
     memset(buf.get(), 0, MAX_BUFFER_SIZE);  // Zero the character buffer
     int bytes_received = 0;
@@ -111,12 +130,13 @@ void SocketListener::handle_client_socket(
              0);
     buf.get()[MAX_BUFFER_SIZE - 1] = 0;  // Null-terminate the character buffer
     if (bytes_received > 0) {
-      std::cout << "Bytes received: " << bytes_received
+      std::cout << "Client " << client_socket_fd
+                << "\nBytes received: " << bytes_received
                 << "\nData: " << buf.get() << std::endl;
       // Handle incoming message
       message_handler();
     } else {
-      std::cout << "client disconnected" << std::endl;
+      std::cout << "Client " << client_socket_fd << " disconnected" << std::endl;
       break;
     }
   }
@@ -160,12 +180,22 @@ void SocketListener::run() {
       std::cout << "Pushing client to queue" << std::endl;
       push_to_queue(std::bind(&SocketListener::handle_client_socket, this,
                               client_socket_fd, message_handler, s_ptr));
-      loop_check();
+      m_loop_thread = std::thread([this]() { loop_check(); });
+      m_loop_thread.detach();
+      accepting_tasks = false;
       std::cout << "At the end" << std::endl;
     }
   }
 }
 
+void SocketListener::detachThreads() {
+  for (std::thread& t : thread_pool) {
+      if (t.joinable()) {
+        t.detach();
+      }
+    }
+}
+
 /**
  * cleanUp
  * @method
@@ -197,13 +227,14 @@ int SocketListener::createSocket() {
               &socket_struct.sin_addr);
 
     int socket_option = 1;
+    // Free up the port to begin listening again
     setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &socket_option,
                sizeof(socket_option));
 
     // Bind local socket address to socket file descriptor
     int bind_result = bind(
-        listening_socket_fd,         // TODO: Use C++ cast on next line?
-        (sockaddr *)&socket_struct,  // cast socket_struct to more generic type
+        listening_socket_fd,        // TODO: Use C++ cast on next line?
+        (sockaddr*)&socket_struct,  // cast socket_struct to more generic type
         sizeof(socket_struct));
     if (bind_result != SOCKET_ERROR) {
       // Listen for connections to socket and allow up to max number of