socket_listener.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Project headers
  2. #include "headers/socket_listener.h"
  3. #include "headers/constants.h"
  4. // System libraries
  5. #include <arpa/inet.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <sys/socket.h>
  9. #include <sys/types.h>
  10. #include <unistd.h>
  11. // C++ Libraries
  12. #include <atomic>
  13. #include <chrono>
  14. #include <condition_variable>
  15. #include <functional>
  16. #include <iostream>
  17. #include <memory>
  18. #include <queue>
  19. #include <string>
  20. #include <thread>
  21. #include <vector>
  22. int num_threads = std::thread::hardware_concurrency();
  23. /**
  24. * Constructor
  25. * Initialize with ip_address, port and message_handler
  26. */
  27. SocketListener::SocketListener(std::string ip_address, int port)
  28. : m_ip_address(ip_address),
  29. m_port(port),
  30. accepting_tasks(true),
  31. shutdown_loop(false) {}
  32. /**
  33. * Destructor
  34. * TODO: Determine if we should make buffer a class member
  35. */
  36. SocketListener::~SocketListener() { cleanup(); }
  37. SocketListener::MessageHandler SocketListener::createMessageHandler(
  38. std::function<void()> cb) {
  39. return MessageHandler(cb);
  40. }
  41. /**
  42. * sendMessage
  43. * @method
  44. * Send a null-terminated array of characters, supplied as a const char
  45. * pointer, to a client socket described by its file descriptor
  46. */
  47. void SocketListener::sendMessage(int client_socket_fd,
  48. std::shared_ptr<char[]> s_ptr) {
  49. send(client_socket_fd, s_ptr.get(), static_cast<size_t>(MAX_BUFFER_SIZE) + 1,
  50. 0);
  51. }
  52. /**
  53. * init
  54. * TODO: Initialize buffer memory, if buffer is to be a class member
  55. */
  56. bool SocketListener::init() {
  57. std::cout << "Initializing socket listener" << std::endl;
  58. return true;
  59. }
  60. void SocketListener::push_to_queue(std::function<void()> fn) {
  61. std::unique_lock<std::mutex> lock(m_mutex_lock);
  62. task_queue.push(fn);
  63. lock.unlock();
  64. pool_condition.notify_one();
  65. }
  66. void SocketListener::handle_loop() {
  67. std::string accepting_str = accepting_tasks == 0
  68. ? std::string("Not accepting tasks")
  69. : std::string("Accepting tasks");
  70. std::cout << accepting_str << std::endl;
  71. std::function<void()> fn;
  72. for (;;) {
  73. {
  74. std::unique_lock<std::mutex> lock(m_mutex_lock);
  75. pool_condition.wait(
  76. lock, [this]() { return !accepting_tasks || !task_queue.empty(); });
  77. std::cout << "Wait condition met" << std::endl;
  78. if (!accepting_tasks && task_queue.empty()) {
  79. return; // we are done here
  80. }
  81. std::cout << "Taking task" << std::endl;
  82. fn = task_queue.front();
  83. task_queue.pop();
  84. }
  85. fn();
  86. }
  87. }
  88. void SocketListener::loop_check() {
  89. for (int i = 0; i < task_queue.size() && i < (num_threads - 1); i++) {
  90. thread_pool.push_back(std::thread([this]() { handle_loop(); }));
  91. }
  92. done();
  93. std::this_thread::sleep_for(std::chrono::milliseconds(400));
  94. detachThreads();
  95. size_t task_num = task_queue.size();
  96. std::cout << "Task num: " << task_num << std::endl;
  97. accepting_tasks = true;
  98. }
  99. void SocketListener::done() {
  100. std::unique_lock<std::mutex> lock(m_mutex_lock);
  101. accepting_tasks = false;
  102. lock.unlock();
  103. // when we send the notification immediately, the consumer will try to get the
  104. // lock , so unlock asap
  105. pool_condition.notify_all();
  106. }
  107. void SocketListener::handle_client_socket(
  108. int client_socket_fd, SocketListener::MessageHandler message_handler,
  109. std::shared_ptr<char[]> buf) {
  110. while (true) {
  111. memset(buf.get(), 0, MAX_BUFFER_SIZE); // Zero the character buffer
  112. int bytes_received = 0;
  113. // Receive and write incoming data to buffer and return the number of
  114. // bytes received
  115. bytes_received =
  116. recv(client_socket_fd, buf.get(),
  117. MAX_BUFFER_SIZE - 2, // Leave room for null-termination
  118. 0);
  119. buf.get()[MAX_BUFFER_SIZE - 1] = 0; // Null-terminate the character buffer
  120. if (bytes_received > 0) {
  121. std::cout << "Client " << client_socket_fd
  122. << "\nBytes received: " << bytes_received
  123. << "\nData: " << buf.get() << std::endl;
  124. // Handle incoming message
  125. message_handler();
  126. } else {
  127. std::cout << "Client " << client_socket_fd << " disconnected" << std::endl;
  128. break;
  129. }
  130. }
  131. // Zero the buffer again before closing
  132. memset(buf.get(), 0, MAX_BUFFER_SIZE);
  133. // TODO: Determine if we should free memory, or handle as class member
  134. close(client_socket_fd); // Destroy client socket and deallocate its fd
  135. }
  136. /**
  137. * run
  138. * @method
  139. * Main message loop
  140. * TODO: Implement multithreading
  141. */
  142. void SocketListener::run() {
  143. // Begin listening loop
  144. while (true) {
  145. std::cout << "Begin" << std::endl;
  146. // Call system to open a listening socket, and return its file descriptor
  147. int listening_socket_fd = createSocket();
  148. if (listening_socket_fd == SOCKET_ERROR) {
  149. std::cout << "Socket error: shutting down server" << std::endl;
  150. break;
  151. }
  152. std::cout << "Attempting to wait for connection" << std::endl;
  153. // wait for a client connection and get its socket file descriptor
  154. int client_socket_fd = waitForConnection(listening_socket_fd);
  155. if (client_socket_fd != SOCKET_ERROR) {
  156. // Destroy listening socket and deallocate its file descriptor. Only use
  157. // the client socket now.
  158. close(listening_socket_fd);
  159. std::shared_ptr<char[]> s_ptr(new char[MAX_BUFFER_SIZE]);
  160. std::function<void()> message_send_fn = [this, client_socket_fd,
  161. s_ptr]() {
  162. this->sendMessage(client_socket_fd, s_ptr);
  163. };
  164. MessageHandler message_handler = createMessageHandler(message_send_fn);
  165. std::cout << "Pushing client to queue" << std::endl;
  166. push_to_queue(std::bind(&SocketListener::handle_client_socket, this,
  167. client_socket_fd, message_handler, s_ptr));
  168. m_loop_thread = std::thread([this]() { loop_check(); });
  169. m_loop_thread.detach();
  170. accepting_tasks = false;
  171. std::cout << "At the end" << std::endl;
  172. }
  173. }
  174. }
  175. void SocketListener::detachThreads() {
  176. for (std::thread& t : thread_pool) {
  177. if (t.joinable()) {
  178. t.detach();
  179. }
  180. }
  181. }
  182. /**
  183. * cleanUp
  184. * @method
  185. * TODO: Determine if we should be cleaning up buffer memory
  186. */
  187. void SocketListener::cleanup() {
  188. std::cout << "Cleaning up" << std::endl;
  189. if (m_loop_thread.joinable()) {
  190. m_loop_thread.join();
  191. }
  192. }
  193. /**
  194. * createSocket
  195. * Open a listening socket and return its file descriptor
  196. */
  197. int SocketListener::createSocket() {
  198. /* Call the system to open a socket passing arguments for
  199. ipv4 family, tcp type and no additional protocol info */
  200. int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  201. if (listening_socket_fd != SOCKET_ERROR) {
  202. std::cout << "Created listening socket" << std::endl;
  203. // Create socket structure to hold address and type
  204. sockaddr_in socket_struct;
  205. socket_struct.sin_family = AF_INET; // ipv4
  206. socket_struct.sin_port =
  207. htons(m_port); // convert byte order of port value from host to network
  208. inet_pton(AF_INET, m_ip_address.c_str(), // convert address to binary
  209. &socket_struct.sin_addr);
  210. int socket_option = 1;
  211. // Free up the port to begin listening again
  212. setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &socket_option,
  213. sizeof(socket_option));
  214. // Bind local socket address to socket file descriptor
  215. int bind_result = bind(
  216. listening_socket_fd, // TODO: Use C++ cast on next line?
  217. (sockaddr*)&socket_struct, // cast socket_struct to more generic type
  218. sizeof(socket_struct));
  219. if (bind_result != SOCKET_ERROR) {
  220. // Listen for connections to socket and allow up to max number of
  221. // connections for queue
  222. int listen_result = listen(listening_socket_fd, SOMAXCONN);
  223. if (listen_result == SOCKET_ERROR) {
  224. return WAIT_SOCKET_FAILURE;
  225. }
  226. } else {
  227. return WAIT_SOCKET_FAILURE;
  228. }
  229. }
  230. return listening_socket_fd; // Return socket file descriptor
  231. }
  232. /**
  233. * waitForConnection
  234. * @method
  235. * Takes first connection on queue of pending connections, creates a new socket
  236. * and returns its file descriptor
  237. */
  238. int SocketListener::waitForConnection(int listening_socket) {
  239. int client_socket_fd = accept(listening_socket, NULL, NULL);
  240. return client_socket_fd;
  241. }