Browse Source

buggy working multithreaded version TOTAL OVERKILL
works but you have to wait for each stream to finish before handling concurrent connections

logicp 5 years ago
parent
commit
9a9ce0ec2c
7 changed files with 173 additions and 97 deletions
  1. 4 1
      CMakeLists.txt
  2. 0 11
      headers/listen_interface.h
  3. 4 2
      headers/send_interface.h
  4. 46 7
      headers/socket_listener.h
  5. 16 0
      headers/types.h
  6. 3 1
      main.cpp
  7. 100 75
      socket_listener.cpp

+ 4 - 1
CMakeLists.txt

@@ -1,10 +1,13 @@
 cmake_minimum_required(VERSION 2.8)
 project(ws_server)
 
+set(THREADS_PREFER_PTHREAD_FLAG ON)
+find_package(Threads REQUIRED)
 set(SOURCES "main.cpp" "socket_listener.cpp")
-set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -std=c++17")
+set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -std=c++17 -g -pthread")
 add_executable(${PROJECT_NAME} ${SOURCES})
 target_include_directories(${PROJECT_NAME} PRIVATE
   "headers"
 )
+target_link_libraries(${PROJECT_NAME} Threads::Threads)
 

+ 0 - 11
headers/listen_interface.h

@@ -1,11 +0,0 @@
-#ifndef __LISTEN_INTERFACE_H__
-#define __LISTEN_INTERFACE_H__
-
-#include <string>
-
-class ListenInterface {
- public:
-  virtual static void onMessageReceived(int socket_id, std::string message) = 0;
-};
-
-#endif  // __LISTEN_INTERFACE_H__

+ 4 - 2
headers/send_interface.h

@@ -1,11 +1,13 @@
 #ifndef __SEND_INTERFACE_H__
 #define __SEND_INTERFACE_H__
 
+#include <memory>
 #include <string>
 
 class SendInterface {
-  public:
-    virtual void sendMessage(int client_socket_fd, std::string message) = 0;
+ public:
+  virtual void sendMessage(int client_socket_fd,
+                           std::shared_ptr<char[]> message) = 0;
 };
 
 #endif  // __SEND_INTERFACE_H__

+ 46 - 7
headers/socket_listener.h

@@ -1,19 +1,38 @@
 #ifndef __SOCKET_LISTENER_H__
 #define __SOCKET_LISTENER_H__
+
 // Project libraries
-#include "listen_interface.h"
 #include "send_interface.h"
+#include "types.h"
 
 // System libraries
 #include <sys/socket.h>
 
 // C++ Libraries
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <iostream>
+#include <memory>
 #include <string>
+#include <thread>
+#include <queue>
+#include <mutex>
+#include <vector>
 
-#define MAX_BUFFER_SIZE (49152)
-
-class SocketListener : public ListenInterface, public SendInterface {
+class SocketListener : public SendInterface {
  public:
+  class MessageHandler {
+   public:
+    MessageHandler(std::function<void()> cb) : m_cb(cb) {}
+
+    void operator()() { m_cb(); }
+
+   private:
+    //    int m_client_socket_fd;
+    //    std::shared_ptr<char[]> m_char_ptr;
+    std::function<void()> m_cb;
+  };
   // constructor
   SocketListener(std::string ipAddress, int port);
 
@@ -25,8 +44,10 @@ class SocketListener : public ListenInterface, public SendInterface {
    * @param[in] {int} client_socket_fd The client socket file descriptor
    * @param[in] {std::string} The message to be sent
    */
-  virtual void sendMessage(int client_socket_fd, std::string message) override;
+  virtual void sendMessage(int client_socket_fd,
+                           std::shared_ptr<char[]> buffer) override;
 
+  MessageHandler createMessageHandler(std::function<void()> cb);
   /**
    * Perform intialization work
    */
@@ -42,8 +63,7 @@ class SocketListener : public ListenInterface, public SendInterface {
    */
   void cleanup();
 
-  virtual static void onMessageReceived(int socket_id,
-                                        std::string message) override;
+  // virtual void setMessageHandler(MessageHandler message_handler) override;
 
  private:
   // private methods
@@ -51,9 +71,28 @@ class SocketListener : public ListenInterface, public SendInterface {
 
   int waitForConnection(int listening);
 
+  void loop_check();
+
+  void handle_loop();
+
+  void push_to_queue(std::function<void()> fn);
+
+  void handle_client_socket(int client_socket_fd,
+                          SocketListener::MessageHandler message_handler,
+                          std::shared_ptr<char[]> buf);
+
   // private members
   std::string m_ip_address;
   int m_port;
+  std::thread m_loop_thread;
+  std::queue<std::function<void()>> task_queue;
+  std::mutex m_mutex_lock;
+  std::condition_variable pool_condition;
+  std::atomic<bool> accepting_tasks;
+  std::atomic<bool> shutdown_loop;
+  std::atomic<bool> m_loop_switch;
+
+  std::vector<std::thread> thread_pool;
 };
 
 #endif  // __SOCKET_LISTENER_H__

+ 16 - 0
headers/types.h

@@ -0,0 +1,16 @@
+#ifndef __TYPES_H__
+#define __TYPES_H__
+
+#include <string>
+
+#define MAX_BUFFER_SIZE (49152)
+
+template <typename MessageProcessor>
+void MessageHandler(MessageProcessor processor, int client_socket_fd, std::string message) {
+  processor(client_socket_fd, message);
+}
+
+
+
+#endif  //__TYPES_H__
+

+ 3 - 1
main.cpp

@@ -1,11 +1,13 @@
 #include <iostream>
 #include <string>
+
 #include "headers/socket_listener.h"
 
-int main() {
+int main(int argc, char** argv) {
   SocketListener server("0.0.0.0", 9009);
 
   if (server.init()) {
+    std::cout << "Running message loop" << std::endl;
     server.run();
   }
   return 0;

+ 100 - 75
socket_listener.cpp

@@ -2,6 +2,7 @@
 #include "headers/socket_listener.h"
 
 #include "headers/constants.h"
+#include "headers/listen_interface.h"
 // System libraries
 #include <arpa/inet.h>
 #include <netdb.h>
@@ -14,33 +15,74 @@
 #include <condition_variable>
 #include <functional>
 #include <iostream>
+#include <memory>
 #include <queue>
 #include <string>
 #include <thread>
 #include <vector>
 
-std::queue<std::function<void()>> task_queue{};
-std::mutex g_mutex_lock;
-std::condition_variable pool_condition;
-std::atomic<bool> accepting_tasks{true};
-std::vector<std::thread> thread_pool{};
+/**
+ * Constructor
+ * Initialize with ip_address, port and message_handler
+ */
+SocketListener::SocketListener(std::string ip_address, int port)
+    : m_ip_address(ip_address), m_port(port), accepting_tasks(true), shutdown_loop(false) {}
+
+/**
+ * Destructor
+ * TODO: Determine if we should make buffer a class member
+ */
+SocketListener::~SocketListener() { cleanup(); }
+
+SocketListener::MessageHandler SocketListener::createMessageHandler(
+    std::function<void()> cb) {
+  return MessageHandler(cb);
+}
+
+/*
+  **setMessageHandler *@method *Set the function to handle received
+   messages * /
+
+  // SocketListener::setMessageHandler(MessageHandler message_handler) {
+  //   m_message_handler = message_handler;
+  // }
+
+  /**
+   * sendMessage
+   * @method
+   * Send a null-terminated array of characters, supplied as a const char
+   * pointer, to a client socket described by its file descriptor
+   */
+void SocketListener::sendMessage(int client_socket_fd,
+                                 std::shared_ptr<char[]> s_ptr) {
+  send(client_socket_fd, s_ptr.get(), static_cast<size_t>(MAX_BUFFER_SIZE) + 1,
+       0);
+}
 
-void push_to_queue(std::function<void()> fn) {
-  std::unique_lock<std::mutex> lock(g_mutex_lock);
+/**
+ * init
+ * TODO: Initialize buffer memory, if buffer is to be a class member
+ */
+bool SocketListener::init() {
+  std::cout << "Initializing socket listener" << std::endl;
+  return true;
+}
+
+void SocketListener::push_to_queue(std::function<void()> fn) {
+  std::unique_lock<std::mutex> lock(m_mutex_lock);
   task_queue.push(fn);
   lock.unlock();
-  pool_condition.notify_on();
+  pool_condition.notify_one();
 }
 
-void handle_loop() {
+void SocketListener::handle_loop() {
   std::function<void()> fn;
   for (;;) {
     {
-      std::unique_lock<std::mutex> lock(g_mutex_lock);
-      pool_condition.wait(lock, [this]() {
-        return !accepting_tasks || !task_queue.empty();
-      }
-      if (!accepting_tasks && task_queue.empty() {
+      std::unique_lock<std::mutex> lock(m_mutex_lock);
+      pool_condition.wait(
+          lock, [this]() { return !accepting_tasks || !task_queue.empty(); });
+      if (!accepting_tasks && task_queue.empty()) {
         return;  // we are done here
       }
       fn = task_queue.front();
@@ -50,70 +92,46 @@ void handle_loop() {
   }
 }
 
-void handle_client_socket(int client_socket_fd,
-                          std::function<void(int, std::string)> cb) {
-  char buf[MAX_BUFFER_SIZE] =
-      {};  // Declare, define and initialize a character buffer
-  std::string buffer_string{};  // Initialize a string buffer
+void SocketListener::loop_check() {
+  if (!m_loop_switch) {
+    m_loop_switch = true;
+    m_loop_thread = std::thread(&SocketListener::handle_loop, this);
+  }
+}
+
+
+void SocketListener::handle_client_socket(int client_socket_fd,
+                          SocketListener::MessageHandler message_handler,
+                          std::shared_ptr<char[]> buf) {
+  // char buf[MAX_BUFFER_SIZE] =
+  //     {};  // Declare, define and initialize a character buffer
+  // std::string buffer_string{};  // Initialize a string buffer
   while (true) {
-    memset(buf, 0, MAX_BUFFER_SIZE);  // Zero the character buffer
+    memset(buf.get(), 0, MAX_BUFFER_SIZE);  // Zero the character buffer
     int bytes_received = 0;
     // Receive and write incoming data to buffer and return the number of
     // bytes received
     bytes_received =
-        recv(client_socket_fd, buf,
+        recv(client_socket_fd, buf.get(),
              MAX_BUFFER_SIZE - 2,  // Leave room for null-termination
              0);
-    buf[MAX_BUFFER_SIZE - 1] = 0;  // Null-terminate the character buffer
+    buf.get()[MAX_BUFFER_SIZE - 1] = 0;  // Null-terminate the character buffer
     if (bytes_received > 0) {
-      buffer_string += buf;
-      std::cout << "Bytes received: " << bytes_received << "\nData: " << buf
-                << std::endl;
+      std::cout << "Bytes received: " << bytes_received
+                << "\nData: " << buf.get() << std::endl;
       // Handle incoming message
-      cb(client_socket_fd, std::string(buf));
+      message_handler();
     } else {
       std::cout << "client disconnected" << std::endl;
       break;
     }
   }
   // Zero the buffer again before closing
-  memset(buf, 0, MAX_BUFFER_SIZE);
+  memset(buf.get(), 0, MAX_BUFFER_SIZE);
   // TODO: Determine if we should free memory, or handle as class member
   close(client_socket_fd);  // Destroy client socket and deallocate its fd
 }
 
-/**
- * Constructor
- * Initialize with ip_address and port
- */
-SocketListener::SocketListener(std::string ip_address, int port)
-    : m_ip_address(ip_address), m_port(port) {}
-
-/**
- * Destructor
- * TODO: Determine if we should make buffer a class member
- */
-SocketListener::~SocketListener() { cleanup(); }
-
-/**
- * sendMessage
- * @method
- * Send a null-terminated array of characters, supplied as a const char pointer,
- * to a client socket described by its file descriptor
- */
-void SocketListener::sendMessage(int client_socket_fd, std::string message) {
-  send(client_socket_fd, message.c_str(), message.size() + 1, 0);
-}
-
-/**
- * init
- * TODO: Initialize buffer memory, if buffer is to be a class member
- */
-bool SocketListener::init() {
-  std::cout << "Initializing socket listener" << std::endl;
-  return true;
-}
-
 /**
  * run
  * @method
@@ -121,10 +139,9 @@ bool SocketListener::init() {
  * TODO: Implement multithreading
  */
 void SocketListener::run() {
-  // Begin handling thread pool and task queue
-  handle_loop();
   // Begin listening loop
   while (true) {
+    std::cout << "Begin" << std::endl;
     // Call system to open a listening socket, and return its file descriptor
     int listening_socket_fd = createSocket();
 
@@ -132,6 +149,7 @@ void SocketListener::run() {
       std::cout << "Socket error: shutting down server" << std::endl;
       break;
     }
+    std::cout << "Attempting to wait for connection" << std::endl;
     // wait for a client connection and get its socket file descriptor
     int client_socket_fd = waitForConnection(listening_socket_fd);
 
@@ -139,8 +157,17 @@ void SocketListener::run() {
       // Destroy listening socket and deallocate its file descriptor. Only use
       // the client socket now.
       close(listening_socket_fd);
-      push_to_queue(
-          std::bind(handle_client_socket, client_socket_fd, onMessageReceived));
+      std::shared_ptr<char[]> s_ptr(new char[MAX_BUFFER_SIZE]);
+      std::function<void()> message_send_fn = [this, client_socket_fd,
+                                               s_ptr]() {
+        this->sendMessage(client_socket_fd, s_ptr);
+      };
+      MessageHandler message_handler = createMessageHandler(message_send_fn);
+      std::cout << "Pushing client to queue" << std::endl;
+      push_to_queue(std::bind(&SocketListener::handle_client_socket, this, client_socket_fd,
+                              message_handler, s_ptr));
+      loop_check();
+      std::cout << "At the end" << std::endl;
     }
   }
 }
@@ -150,7 +177,11 @@ void SocketListener::run() {
  * @method
  * TODO: Determine if we should be cleaning up buffer memory
  */
-void SocketListener::cleanup() { std::cout << "Cleaning up" << std::endl; }
+void SocketListener::cleanup() { std::cout << "Cleaning up" << std::endl;
+  if (m_loop_thread.joinable()) {
+    m_loop_thread.join();
+  }
+}
 /**
  * createSocket
  * Open a listening socket and return its file descriptor
@@ -161,6 +192,7 @@ int SocketListener::createSocket() {
   int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
 
   if (listening_socket_fd != SOCKET_ERROR) {
+    std::cout << "Created listening socket" << std::endl;
     // Create socket structure to hold address and type
     sockaddr_in socket_struct;
     socket_struct.sin_family = AF_INET;  // ipv4
@@ -168,6 +200,10 @@ int SocketListener::createSocket() {
         htons(m_port);  // convert byte order of port value from host to network
     inet_pton(AF_INET, m_ip_address.c_str(),  // convert address to binary
               &socket_struct.sin_addr);
+
+    int socket_option = 1;
+    setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &socket_option, sizeof(socket_option));
+
     // Bind local socket address to socket file descriptor
     int bind_result = bind(
         listening_socket_fd,         // TODO: Use C++ cast on next line?
@@ -196,14 +232,3 @@ int SocketListener::waitForConnection(int listening_socket) {
   int client_socket_fd = accept(listening_socket, NULL, NULL);
   return client_socket_fd;
 }
-/**
- * onMessageReceived
- * @method
- * @override
- * Handle messages successfully received from a client socket
- */
-static void SocketListener::onMessageReceived(int socket_id,
-                                              std::string message) {
-  sendMessage(socket_id, message);
-}
-