socket_listener.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Project headers
  2. #include "headers/socket_listener.h"
  3. #include "headers/constants.h"
  4. // System libraries
  5. #include <arpa/inet.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <sys/socket.h>
  9. #include <sys/types.h>
  10. #include <unistd.h>
  11. // C++ Libraries
  12. #include <atomic>
  13. #include <chrono>
  14. #include <condition_variable>
  15. #include <functional>
  16. #include <iostream>
  17. #include <memory>
  18. #include <queue>
  19. #include <string>
  20. #include <thread>
  21. #include <vector>
  22. int num_threads = std::thread::hardware_concurrency();
  23. /**
  24. * Constructor
  25. * Initialize with ip_address, port and message_handler
  26. */
  27. SocketListener::SocketListener(int arg_num, char** args)
  28. : m_port(-1), accepting_tasks(true) {
  29. for (int i = 0; i < arg_num; i++) {
  30. std::string argument = std::string(args[i]);
  31. std::cout << args[i] << std::endl;
  32. if (argument.find("--ip") != -1) {
  33. m_ip_address = argument.substr(5);
  34. continue;
  35. }
  36. if (argument.find("--port") != -1) {
  37. m_port = std::stoi(argument.substr(7));
  38. continue;
  39. }
  40. if (m_ip_address.empty()) {
  41. m_ip_address = "0.0.0.0";
  42. }
  43. if (m_port == -1) {
  44. m_port = 9009;
  45. }
  46. }
  47. }
  48. /**
  49. * Destructor
  50. * TODO: Determine if we should make buffer a class member
  51. */
  52. SocketListener::~SocketListener() { cleanup(); }
  53. SocketListener::MessageHandler SocketListener::createMessageHandler(
  54. std::function<void()> cb) {
  55. return MessageHandler(cb);
  56. }
  57. /**
  58. * sendMessage
  59. * @method
  60. * Send a null-terminated array of characters, supplied as a const char
  61. * pointer, to a client socket described by its file descriptor
  62. */
  63. void SocketListener::sendMessage(int client_socket_fd,
  64. std::weak_ptr<char[]> w_buffer_ptr) {
  65. std::shared_ptr<char[]> s_buffer_ptr = w_buffer_ptr.lock();
  66. if (s_buffer_ptr) {
  67. send(client_socket_fd, s_buffer_ptr.get(),
  68. static_cast<size_t>(MAX_BUFFER_SIZE) + 1, 0);
  69. } else {
  70. std::cout << "Could not send message to client " << client_socket_fd
  71. << ". Buffer does not exist." << std::endl;
  72. }
  73. }
  74. /**
  75. * init
  76. * TODO: Initialize buffer memory, if buffer is to be a class member
  77. */
  78. bool SocketListener::init() {
  79. std::cout << "Initializing socket listener" << std::endl;
  80. return true;
  81. }
  82. void SocketListener::pushToQueue(std::function<void()> fn) {
  83. std::unique_lock<std::mutex> lock(m_mutex_lock);
  84. task_queue.push(fn);
  85. lock.unlock();
  86. pool_condition.notify_one();
  87. }
  88. void SocketListener::handleLoop() {
  89. std::string accepting_str = accepting_tasks == 0
  90. ? std::string("Not accepting tasks")
  91. : std::string("Accepting tasks");
  92. std::cout << accepting_str << std::endl;
  93. std::function<void()> fn;
  94. for (;;) {
  95. {
  96. std::unique_lock<std::mutex> lock(m_mutex_lock);
  97. pool_condition.wait(
  98. lock, [this]() { return !accepting_tasks || !task_queue.empty(); });
  99. std::cout << "Wait condition met" << std::endl;
  100. if (!accepting_tasks && task_queue.empty()) {
  101. return; // we are done here
  102. }
  103. std::cout << "Taking task" << std::endl;
  104. fn = task_queue.front();
  105. task_queue.pop();
  106. }
  107. fn();
  108. }
  109. }
  110. void SocketListener::loopCheck() {
  111. for (int i = 0; i < task_queue.size() && i < (num_threads - 1); i++) {
  112. thread_pool.push_back(std::thread([this]() { handleLoop(); }));
  113. }
  114. done();
  115. std::this_thread::sleep_for(std::chrono::milliseconds(400));
  116. detachThreads();
  117. size_t task_num = task_queue.size();
  118. std::cout << "Task num: " << task_num << std::endl;
  119. accepting_tasks = true;
  120. }
  121. void SocketListener::done() {
  122. std::unique_lock<std::mutex> lock(m_mutex_lock);
  123. accepting_tasks = false;
  124. lock.unlock();
  125. // when we send the notification immediately, the consumer will try to get the
  126. // lock , so unlock asap
  127. pool_condition.notify_all();
  128. }
  129. void SocketListener::handleClientSocket(
  130. int client_socket_fd, SocketListener::MessageHandler message_handler,
  131. const std::shared_ptr<char[]>& s_buffer_ptr) {
  132. for (;;) {
  133. memset(s_buffer_ptr.get(), 0,
  134. MAX_BUFFER_SIZE); // Zero the character buffer
  135. int bytes_received = 0;
  136. // Receive and write incoming data to buffer and return the number of
  137. // bytes received
  138. bytes_received =
  139. recv(client_socket_fd, s_buffer_ptr.get(),
  140. MAX_BUFFER_SIZE - 2, // Leave room for null-termination
  141. 0);
  142. s_buffer_ptr.get()[MAX_BUFFER_SIZE - 1] =
  143. 0; // Null-terminate the character buffer
  144. if (bytes_received > 0) {
  145. std::cout << "Client " << client_socket_fd
  146. << "\nBytes received: " << bytes_received
  147. << "\nData: " << s_buffer_ptr.get() << std::endl;
  148. // Handle incoming message
  149. message_handler();
  150. } else {
  151. std::cout << "Client " << client_socket_fd << " disconnected"
  152. << std::endl;
  153. // Zero the buffer again before closing
  154. memset(s_buffer_ptr.get(), 0, MAX_BUFFER_SIZE);
  155. break;
  156. }
  157. }
  158. // TODO: Determine if we should free memory, or handle as class member
  159. close(client_socket_fd); // Destroy client socket and deallocate its fd
  160. }
  161. /**
  162. * run
  163. * @method
  164. * Main message loop
  165. * TODO: Implement multithreading
  166. */
  167. void SocketListener::run() {
  168. // Begin listening loop
  169. while (true) {
  170. std::cout << "Begin" << std::endl;
  171. // Call system to open a listening socket, and return its file descriptor
  172. int listening_socket_fd = createSocket();
  173. if (listening_socket_fd == SOCKET_ERROR) {
  174. std::cout << "Socket error: shutting down server" << std::endl;
  175. break;
  176. }
  177. std::cout << "Attempting to wait for connection" << std::endl;
  178. // wait for a client connection and get its socket file descriptor
  179. int client_socket_fd = waitForConnection(listening_socket_fd);
  180. if (client_socket_fd != SOCKET_ERROR) {
  181. // Destroy listening socket and deallocate its file descriptor. Only use
  182. // the client socket now.
  183. close(listening_socket_fd);
  184. {
  185. std::shared_ptr<char[]> s_buffer_ptr(new char[MAX_BUFFER_SIZE]);
  186. std::weak_ptr<char[]> w_buffer_ptr(s_buffer_ptr);
  187. std::function<void()> message_send_fn = [this, client_socket_fd,
  188. w_buffer_ptr]() {
  189. this->sendMessage(client_socket_fd, w_buffer_ptr);
  190. };
  191. MessageHandler message_handler = createMessageHandler(message_send_fn);
  192. std::cout << "Pushing client to queue" << std::endl;
  193. pushToQueue(
  194. std::bind(&SocketListener::handleClientSocket, this,
  195. client_socket_fd, message_handler,
  196. std::forward<std::shared_ptr<char[]>>(s_buffer_ptr)));
  197. }
  198. m_loop_thread = std::thread([this]() { loopCheck(); });
  199. m_loop_thread.detach();
  200. accepting_tasks = false;
  201. std::cout << "At the end" << std::endl;
  202. }
  203. }
  204. }
  205. void SocketListener::detachThreads() {
  206. for (std::thread& t : thread_pool) {
  207. if (t.joinable()) {
  208. t.detach();
  209. }
  210. }
  211. }
  212. /**
  213. * cleanUp
  214. * @method
  215. * TODO: Determine if we should be cleaning up buffer memory
  216. */
  217. void SocketListener::cleanup() {
  218. std::cout << "Cleaning up" << std::endl;
  219. if (m_loop_thread.joinable()) {
  220. m_loop_thread.join();
  221. }
  222. }
  223. /**
  224. * createSocket
  225. * Open a listening socket and return its file descriptor
  226. */
  227. int SocketListener::createSocket() {
  228. /* Call the system to open a socket passing arguments for
  229. ipv4 family, tcp type and no additional protocol info */
  230. int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  231. if (listening_socket_fd != SOCKET_ERROR) {
  232. std::cout << "Created listening socket" << std::endl;
  233. // Create socket structure to hold address and type
  234. sockaddr_in socket_struct;
  235. socket_struct.sin_family = AF_INET; // ipv4
  236. socket_struct.sin_port =
  237. htons(m_port); // convert byte order of port value from host to network
  238. inet_pton(AF_INET, m_ip_address.c_str(), // convert address to binary
  239. &socket_struct.sin_addr);
  240. int socket_option = 1;
  241. // Free up the port to begin listening again
  242. setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &socket_option,
  243. sizeof(socket_option));
  244. // Bind local socket address to socket file descriptor
  245. int bind_result = bind(
  246. listening_socket_fd, // TODO: Use C++ cast on next line?
  247. (sockaddr*)&socket_struct, // cast socket_struct to more generic type
  248. sizeof(socket_struct));
  249. if (bind_result != SOCKET_ERROR) {
  250. // Listen for connections to socket and allow up to max number of
  251. // connections for queue
  252. int listen_result = listen(listening_socket_fd, SOMAXCONN);
  253. if (listen_result == SOCKET_ERROR) {
  254. return WAIT_SOCKET_FAILURE;
  255. }
  256. } else {
  257. return WAIT_SOCKET_FAILURE;
  258. }
  259. }
  260. return listening_socket_fd; // Return socket file descriptor
  261. }
  262. /**
  263. * waitForConnection
  264. * @method
  265. * Takes first connection on queue of pending connections, creates a new socket
  266. * and returns its file descriptor
  267. */
  268. int SocketListener::waitForConnection(int listening_socket) {
  269. int client_socket_fd = accept(listening_socket, NULL, NULL);
  270. return client_socket_fd;
  271. }