socket_listener.cpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. // Project headers
  2. #include "headers/socket_listener.h"
  3. #include "headers/constants.h"
  4. // System libraries
  5. #include <arpa/inet.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <sys/socket.h>
  9. #include <sys/types.h>
  10. #include <unistd.h>
  11. // C++ Libraries
  12. #include <atomic>
  13. #include <chrono>
  14. #include <condition_variable>
  15. #include <functional>
  16. #include <iostream>
  17. #include <memory>
  18. #include <queue>
  19. #include <string>
  20. #include <thread>
  21. #include <vector>
  22. int num_threads = std::thread::hardware_concurrency();
  23. /**
  24. * Constructor
  25. * Initialize with ip_address, port and message_handler
  26. */
  27. SocketListener::SocketListener(std::string ip_address, int port)
  28. : m_ip_address(ip_address), m_port(port), accepting_tasks(true) {}
  29. /**
  30. * Destructor
  31. * TODO: Determine if we should make buffer a class member
  32. */
  33. SocketListener::~SocketListener() { cleanup(); }
  34. SocketListener::MessageHandler SocketListener::createMessageHandler(
  35. std::function<void()> cb) {
  36. return MessageHandler(cb);
  37. }
  38. /**
  39. * sendMessage
  40. * @method
  41. * Send a null-terminated array of characters, supplied as a const char
  42. * pointer, to a client socket described by its file descriptor
  43. */
  44. void SocketListener::sendMessage(int client_socket_fd,
  45. std::weak_ptr<char[]> w_buffer_ptr) {
  46. std::shared_ptr<char[]> s_buffer_ptr = w_buffer_ptr.lock();
  47. if (s_buffer_ptr) {
  48. send(client_socket_fd, s_buffer_ptr.get(),
  49. static_cast<size_t>(MAX_BUFFER_SIZE) + 1, 0);
  50. } else {
  51. std::cout << "Could not send message to client " << client_socket_fd
  52. << ". Buffer does not exist." << std::endl;
  53. }
  54. }
  55. /**
  56. * init
  57. * TODO: Initialize buffer memory, if buffer is to be a class member
  58. */
  59. bool SocketListener::init() {
  60. std::cout << "Initializing socket listener" << std::endl;
  61. return true;
  62. }
  63. void SocketListener::pushToQueue(std::function<void()> fn) {
  64. std::unique_lock<std::mutex> lock(m_mutex_lock);
  65. task_queue.push(fn);
  66. lock.unlock();
  67. pool_condition.notify_one();
  68. }
  69. void SocketListener::handleLoop() {
  70. std::string accepting_str = accepting_tasks == 0
  71. ? std::string("Not accepting tasks")
  72. : std::string("Accepting tasks");
  73. std::cout << accepting_str << std::endl;
  74. std::function<void()> fn;
  75. for (;;) {
  76. {
  77. std::unique_lock<std::mutex> lock(m_mutex_lock);
  78. pool_condition.wait(
  79. lock, [this]() { return !accepting_tasks || !task_queue.empty(); });
  80. std::cout << "Wait condition met" << std::endl;
  81. if (!accepting_tasks && task_queue.empty()) {
  82. return; // we are done here
  83. }
  84. std::cout << "Taking task" << std::endl;
  85. fn = task_queue.front();
  86. task_queue.pop();
  87. }
  88. fn();
  89. }
  90. }
  91. void SocketListener::loopCheck() {
  92. for (int i = 0; i < task_queue.size() && i < (num_threads - 1); i++) {
  93. thread_pool.push_back(std::thread([this]() { handleLoop(); }));
  94. }
  95. done();
  96. std::this_thread::sleep_for(std::chrono::milliseconds(400));
  97. detachThreads();
  98. size_t task_num = task_queue.size();
  99. std::cout << "Task num: " << task_num << std::endl;
  100. accepting_tasks = true;
  101. }
  102. void SocketListener::done() {
  103. std::unique_lock<std::mutex> lock(m_mutex_lock);
  104. accepting_tasks = false;
  105. lock.unlock();
  106. // when we send the notification immediately, the consumer will try to get the
  107. // lock , so unlock asap
  108. pool_condition.notify_all();
  109. }
  110. void SocketListener::handleClientSocket(
  111. int client_socket_fd, SocketListener::MessageHandler message_handler,
  112. const std::shared_ptr<char[]>& s_buffer_ptr) {
  113. for (;;) {
  114. memset(s_buffer_ptr.get(), 0,
  115. MAX_BUFFER_SIZE); // Zero the character buffer
  116. int bytes_received = 0;
  117. // Receive and write incoming data to buffer and return the number of
  118. // bytes received
  119. bytes_received =
  120. recv(client_socket_fd, s_buffer_ptr.get(),
  121. MAX_BUFFER_SIZE - 2, // Leave room for null-termination
  122. 0);
  123. s_buffer_ptr.get()[MAX_BUFFER_SIZE - 1] =
  124. 0; // Null-terminate the character buffer
  125. if (bytes_received > 0) {
  126. std::cout << "Client " << client_socket_fd
  127. << "\nBytes received: " << bytes_received
  128. << "\nData: " << s_buffer_ptr.get() << std::endl;
  129. // Handle incoming message
  130. message_handler();
  131. } else {
  132. std::cout << "Client " << client_socket_fd << " disconnected"
  133. << std::endl;
  134. // Zero the buffer again before closing
  135. memset(s_buffer_ptr.get(), 0, MAX_BUFFER_SIZE);
  136. break;
  137. }
  138. }
  139. // TODO: Determine if we should free memory, or handle as class member
  140. close(client_socket_fd); // Destroy client socket and deallocate its fd
  141. }
  142. /**
  143. * run
  144. * @method
  145. * Main message loop
  146. * TODO: Implement multithreading
  147. */
  148. void SocketListener::run() {
  149. // Begin listening loop
  150. while (true) {
  151. std::cout << "Begin" << std::endl;
  152. // Call system to open a listening socket, and return its file descriptor
  153. int listening_socket_fd = createSocket();
  154. if (listening_socket_fd == SOCKET_ERROR) {
  155. std::cout << "Socket error: shutting down server" << std::endl;
  156. break;
  157. }
  158. std::cout << "Attempting to wait for connection" << std::endl;
  159. // wait for a client connection and get its socket file descriptor
  160. int client_socket_fd = waitForConnection(listening_socket_fd);
  161. if (client_socket_fd != SOCKET_ERROR) {
  162. // Destroy listening socket and deallocate its file descriptor. Only use
  163. // the client socket now.
  164. close(listening_socket_fd);
  165. {
  166. std::shared_ptr<char[]> s_buffer_ptr(new char[MAX_BUFFER_SIZE]);
  167. std::weak_ptr<char[]> w_buffer_ptr(s_buffer_ptr);
  168. std::function<void()> message_send_fn = [this, client_socket_fd,
  169. w_buffer_ptr]() {
  170. this->sendMessage(client_socket_fd, w_buffer_ptr);
  171. };
  172. MessageHandler message_handler = createMessageHandler(message_send_fn);
  173. std::cout << "Pushing client to queue" << std::endl;
  174. pushToQueue(
  175. std::bind(&SocketListener::handleClientSocket, this,
  176. client_socket_fd, message_handler,
  177. std::forward<std::shared_ptr<char[]>>(s_buffer_ptr)));
  178. }
  179. m_loop_thread = std::thread([this]() { loopCheck(); });
  180. m_loop_thread.detach();
  181. accepting_tasks = false;
  182. std::cout << "At the end" << std::endl;
  183. }
  184. }
  185. }
  186. void SocketListener::detachThreads() {
  187. for (std::thread& t : thread_pool) {
  188. if (t.joinable()) {
  189. t.detach();
  190. }
  191. }
  192. }
  193. /**
  194. * cleanUp
  195. * @method
  196. * TODO: Determine if we should be cleaning up buffer memory
  197. */
  198. void SocketListener::cleanup() {
  199. std::cout << "Cleaning up" << std::endl;
  200. if (m_loop_thread.joinable()) {
  201. m_loop_thread.join();
  202. }
  203. }
  204. /**
  205. * createSocket
  206. * Open a listening socket and return its file descriptor
  207. */
  208. int SocketListener::createSocket() {
  209. /* Call the system to open a socket passing arguments for
  210. ipv4 family, tcp type and no additional protocol info */
  211. int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  212. if (listening_socket_fd != SOCKET_ERROR) {
  213. std::cout << "Created listening socket" << std::endl;
  214. // Create socket structure to hold address and type
  215. sockaddr_in socket_struct;
  216. socket_struct.sin_family = AF_INET; // ipv4
  217. socket_struct.sin_port =
  218. htons(m_port); // convert byte order of port value from host to network
  219. inet_pton(AF_INET, m_ip_address.c_str(), // convert address to binary
  220. &socket_struct.sin_addr);
  221. int socket_option = 1;
  222. // Free up the port to begin listening again
  223. setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &socket_option,
  224. sizeof(socket_option));
  225. // Bind local socket address to socket file descriptor
  226. int bind_result = bind(
  227. listening_socket_fd, // TODO: Use C++ cast on next line?
  228. (sockaddr*)&socket_struct, // cast socket_struct to more generic type
  229. sizeof(socket_struct));
  230. if (bind_result != SOCKET_ERROR) {
  231. // Listen for connections to socket and allow up to max number of
  232. // connections for queue
  233. int listen_result = listen(listening_socket_fd, SOMAXCONN);
  234. if (listen_result == SOCKET_ERROR) {
  235. return WAIT_SOCKET_FAILURE;
  236. }
  237. } else {
  238. return WAIT_SOCKET_FAILURE;
  239. }
  240. }
  241. return listening_socket_fd; // Return socket file descriptor
  242. }
  243. /**
  244. * waitForConnection
  245. * @method
  246. * Takes first connection on queue of pending connections, creates a new socket
  247. * and returns its file descriptor
  248. */
  249. int SocketListener::waitForConnection(int listening_socket) {
  250. int client_socket_fd = accept(listening_socket, NULL, NULL);
  251. return client_socket_fd;
  252. }