socket_listener.cpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Project headers
  2. #include "headers/socket_listener.h"
  3. #include "headers/constants.h"
  4. // System libraries
  5. #include <arpa/inet.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <sys/socket.h>
  9. #include <sys/types.h>
  10. #include <unistd.h>
  11. // C++ Libraries
  12. #include <atomic>
  13. #include <condition_variable>
  14. #include <functional>
  15. #include <iostream>
  16. #include <queue>
  17. #include <string>
  18. #include <thread>
  19. #include <vector>
  20. std::queue<std::function<void()>> task_queue{};
  21. std::mutex g_mutex_lock;
  22. std::condition_variable pool_condition;
  23. std::atomic<bool> accepting_tasks{true};
  24. std::vector<std::thread> thread_pool{};
  25. void push_to_queue(std::function<void()> fn) {
  26. std::unique_lock<std::mutex> lock(g_mutex_lock);
  27. task_queue.push(fn);
  28. lock.unlock();
  29. pool_condition.notify_on();
  30. }
  31. void handle_loop() {
  32. std::function<void()> fn;
  33. for (;;) {
  34. {
  35. std::unique_lock<std::mutex> lock(g_mutex_lock);
  36. pool_condition.wait(lock, [this]() {
  37. return !accepting_tasks || !task_queue.empty();
  38. }
  39. if (!accepting_tasks && task_queue.empty() {
  40. return; // we are done here
  41. }
  42. fn = task_queue.front();
  43. task_queue.pop();
  44. }
  45. fn();
  46. }
  47. }
  48. void handle_client_socket(int client_socket_fd,
  49. std::function<void(int, std::string)> cb) {
  50. char buf[MAX_BUFFER_SIZE] =
  51. {}; // Declare, define and initialize a character buffer
  52. std::string buffer_string{}; // Initialize a string buffer
  53. while (true) {
  54. memset(buf, 0, MAX_BUFFER_SIZE); // Zero the character buffer
  55. int bytes_received = 0;
  56. // Receive and write incoming data to buffer and return the number of
  57. // bytes received
  58. bytes_received =
  59. recv(client_socket_fd, buf,
  60. MAX_BUFFER_SIZE - 2, // Leave room for null-termination
  61. 0);
  62. buf[MAX_BUFFER_SIZE - 1] = 0; // Null-terminate the character buffer
  63. if (bytes_received > 0) {
  64. buffer_string += buf;
  65. std::cout << "Bytes received: " << bytes_received << "\nData: " << buf
  66. << std::endl;
  67. // Handle incoming message
  68. cb(client_socket_fd, std::string(buf));
  69. } else {
  70. std::cout << "client disconnected" << std::endl;
  71. break;
  72. }
  73. }
  74. // Zero the buffer again before closing
  75. memset(buf, 0, MAX_BUFFER_SIZE);
  76. // TODO: Determine if we should free memory, or handle as class member
  77. close(client_socket_fd); // Destroy client socket and deallocate its fd
  78. }
  79. /**
  80. * Constructor
  81. * Initialize with ip_address and port
  82. */
  83. SocketListener::SocketListener(std::string ip_address, int port)
  84. : m_ip_address(ip_address), m_port(port) {}
  85. /**
  86. * Destructor
  87. * TODO: Determine if we should make buffer a class member
  88. */
  89. SocketListener::~SocketListener() { cleanup(); }
  90. /**
  91. * sendMessage
  92. * @method
  93. * Send a null-terminated array of characters, supplied as a const char pointer,
  94. * to a client socket described by its file descriptor
  95. */
  96. void SocketListener::sendMessage(int client_socket_fd, std::string message) {
  97. send(client_socket_fd, message.c_str(), message.size() + 1, 0);
  98. }
  99. /**
  100. * init
  101. * TODO: Initialize buffer memory, if buffer is to be a class member
  102. */
  103. bool SocketListener::init() {
  104. std::cout << "Initializing socket listener" << std::endl;
  105. return true;
  106. }
  107. /**
  108. * run
  109. * @method
  110. * Main message loop
  111. * TODO: Implement multithreading
  112. */
  113. void SocketListener::run() {
  114. // Begin handling thread pool and task queue
  115. handle_loop();
  116. // Begin listening loop
  117. while (true) {
  118. // Call system to open a listening socket, and return its file descriptor
  119. int listening_socket_fd = createSocket();
  120. if (listening_socket_fd == SOCKET_ERROR) {
  121. std::cout << "Socket error: shutting down server" << std::endl;
  122. break;
  123. }
  124. // wait for a client connection and get its socket file descriptor
  125. int client_socket_fd = waitForConnection(listening_socket_fd);
  126. if (client_socket_fd != SOCKET_ERROR) {
  127. // Destroy listening socket and deallocate its file descriptor. Only use
  128. // the client socket now.
  129. close(listening_socket_fd);
  130. push_to_queue(
  131. std::bind(handle_client_socket, client_socket_fd, onMessageReceived));
  132. }
  133. }
  134. }
  135. /**
  136. * cleanUp
  137. * @method
  138. * TODO: Determine if we should be cleaning up buffer memory
  139. */
  140. void SocketListener::cleanup() { std::cout << "Cleaning up" << std::endl; }
  141. /**
  142. * createSocket
  143. * Open a listening socket and return its file descriptor
  144. */
  145. int SocketListener::createSocket() {
  146. /* Call the system to open a socket passing arguments for
  147. ipv4 family, tcp type and no additional protocol info */
  148. int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  149. if (listening_socket_fd != SOCKET_ERROR) {
  150. // Create socket structure to hold address and type
  151. sockaddr_in socket_struct;
  152. socket_struct.sin_family = AF_INET; // ipv4
  153. socket_struct.sin_port =
  154. htons(m_port); // convert byte order of port value from host to network
  155. inet_pton(AF_INET, m_ip_address.c_str(), // convert address to binary
  156. &socket_struct.sin_addr);
  157. // Bind local socket address to socket file descriptor
  158. int bind_result = bind(
  159. listening_socket_fd, // TODO: Use C++ cast on next line?
  160. (sockaddr *)&socket_struct, // cast socket_struct to more generic type
  161. sizeof(socket_struct));
  162. if (bind_result != SOCKET_ERROR) {
  163. // Listen for connections to socket and allow up to max number of
  164. // connections for queue
  165. int listen_result = listen(listening_socket_fd, SOMAXCONN);
  166. if (listen_result == SOCKET_ERROR) {
  167. return WAIT_SOCKET_FAILURE;
  168. }
  169. } else {
  170. return WAIT_SOCKET_FAILURE;
  171. }
  172. }
  173. return listening_socket_fd; // Return socket file descriptor
  174. }
  175. /**
  176. * waitForConnection
  177. * @method
  178. * Takes first connection on queue of pending connections, creates a new socket
  179. * and returns its file descriptor
  180. */
  181. int SocketListener::waitForConnection(int listening_socket) {
  182. int client_socket_fd = accept(listening_socket, NULL, NULL);
  183. return client_socket_fd;
  184. }
  185. /**
  186. * onMessageReceived
  187. * @method
  188. * @override
  189. * Handle messages successfully received from a client socket
  190. */
  191. static void SocketListener::onMessageReceived(int socket_id,
  192. std::string message) {
  193. sendMessage(socket_id, message);
  194. }