Browse Source

still doesn`t work because we need to use functors or take the messaging outside the class

logicp 5 years ago
parent
commit
d6ac81ed43
3 changed files with 78 additions and 31 deletions
  1. 1 1
      headers/listen_interface.h
  2. 2 1
      headers/socket_listener.h
  3. 75 29
      socket_listener.cpp

+ 1 - 1
headers/listen_interface.h

@@ -5,7 +5,7 @@
 
 class ListenInterface {
  public:
-  virtual void onMessageReceived(int socket_id, std::string message) = 0;
+  virtual static void onMessageReceived(int socket_id, std::string message) = 0;
 };
 
 #endif  // __LISTEN_INTERFACE_H__

+ 2 - 1
headers/socket_listener.h

@@ -42,7 +42,8 @@ class SocketListener : public ListenInterface, public SendInterface {
    */
   void cleanup();
 
-  virtual void onMessageReceived(int socket_id, std::string message) override;
+  virtual static void onMessageReceived(int socket_id,
+                                        std::string message) override;
 
  private:
   // private methods

+ 75 - 29
socket_listener.cpp

@@ -10,8 +10,77 @@
 #include <sys/types.h>
 #include <unistd.h>
 // C++ Libraries
+#include <atomic>
+#include <condition_variable>
+#include <functional>
 #include <iostream>
+#include <queue>
 #include <string>
+#include <thread>
+#include <vector>
+
+std::queue<std::function<void()>> task_queue{};
+std::mutex g_mutex_lock;
+std::condition_variable pool_condition;
+std::atomic<bool> accepting_tasks{true};
+std::vector<std::thread> thread_pool{};
+
+void push_to_queue(std::function<void()> fn) {
+  std::unique_lock<std::mutex> lock(g_mutex_lock);
+  task_queue.push(fn);
+  lock.unlock();
+  pool_condition.notify_on();
+}
+
+void handle_loop() {
+  std::function<void()> fn;
+  for (;;) {
+    {
+      std::unique_lock<std::mutex> lock(g_mutex_lock);
+      pool_condition.wait(lock, [this]() {
+        return !accepting_tasks || !task_queue.empty();
+      }
+      if (!accepting_tasks && task_queue.empty() {
+        return;  // we are done here
+      }
+      fn = task_queue.front();
+      task_queue.pop();
+    }
+    fn();
+  }
+}
+
+void handle_client_socket(int client_socket_fd,
+                          std::function<void(int, std::string)> cb) {
+  char buf[MAX_BUFFER_SIZE] =
+      {};  // Declare, define and initialize a character buffer
+  std::string buffer_string{};  // Initialize a string buffer
+  while (true) {
+    memset(buf, 0, MAX_BUFFER_SIZE);  // Zero the character buffer
+    int bytes_received = 0;
+    // Receive and write incoming data to buffer and return the number of
+    // bytes received
+    bytes_received =
+        recv(client_socket_fd, buf,
+             MAX_BUFFER_SIZE - 2,  // Leave room for null-termination
+             0);
+    buf[MAX_BUFFER_SIZE - 1] = 0;  // Null-terminate the character buffer
+    if (bytes_received > 0) {
+      buffer_string += buf;
+      std::cout << "Bytes received: " << bytes_received << "\nData: " << buf
+                << std::endl;
+      // Handle incoming message
+      cb(client_socket_fd, std::string(buf));
+    } else {
+      std::cout << "client disconnected" << std::endl;
+      break;
+    }
+  }
+  // Zero the buffer again before closing
+  memset(buf, 0, MAX_BUFFER_SIZE);
+  // TODO: Determine if we should free memory, or handle as class member
+  close(client_socket_fd);  // Destroy client socket and deallocate its fd
+}
 
 /**
  * Constructor
@@ -52,8 +121,8 @@ bool SocketListener::init() {
  * TODO: Implement multithreading
  */
 void SocketListener::run() {
-  // Declare, define and initialize a character buffer
-  char buf[MAX_BUFFER_SIZE] = {};
+  // Begin handling thread pool and task queue
+  handle_loop();
   // Begin listening loop
   while (true) {
     // Call system to open a listening socket, and return its file descriptor
@@ -70,32 +139,8 @@ void SocketListener::run() {
       // Destroy listening socket and deallocate its file descriptor. Only use
       // the client socket now.
       close(listening_socket_fd);
-      std::string buffer_string{};  // Initialize a string buffer
-      while (true) {
-        memset(buf, 0, MAX_BUFFER_SIZE);  // Zero the character buffer
-        int bytes_received = 0;
-        // Receive and write incoming data to buffer and return the number of
-        // bytes received
-        bytes_received =
-            recv(client_socket_fd, buf,
-                 MAX_BUFFER_SIZE - 2,  // Leave room for null-termination
-                 0);
-        buf[MAX_BUFFER_SIZE - 1] = 0;  // Null-terminate the character buffer
-        if (bytes_received > 0) {
-          buffer_string += buf;
-          std::cout << "Bytes received: " << bytes_received << "\nData: " << buf
-                    << std::endl;
-          // Handle incoming message
-          onMessageReceived(client_socket_fd, std::string(buf));
-        } else {
-          std::cout << "client disconnected" << std::endl;
-          break;
-        }
-      }
-      // Zero the buffer again before closing
-      memset(buf, 0, MAX_BUFFER_SIZE);
-      // TODO: Determine if we should free memory, or handle as class member
-      close(client_socket_fd);  // Destroy client socket and deallocate its fd
+      push_to_queue(
+          std::bind(handle_client_socket, client_socket_fd, onMessageReceived));
     }
   }
 }
@@ -157,7 +202,8 @@ int SocketListener::waitForConnection(int listening_socket) {
  * @override
  * Handle messages successfully received from a client socket
  */
-void SocketListener::onMessageReceived(int socket_id, std::string message) {
+static void SocketListener::onMessageReceived(int socket_id,
+                                              std::string message) {
   sendMessage(socket_id, message);
 }