server code / add dedicated thread to close/join terminated connection threads
This commit is contained in:
		| @@ -30,7 +30,9 @@ namespace ix | |||||||
|         _host(host), |         _host(host), | ||||||
|         _backlog(backlog), |         _backlog(backlog), | ||||||
|         _maxConnections(maxConnections), |         _maxConnections(maxConnections), | ||||||
|  |         _serverFd(-1), | ||||||
|         _stop(false), |         _stop(false), | ||||||
|  |         _stopGc(false), | ||||||
|         _connectionStateFactory(&ConnectionState::createConnectionState) |         _connectionStateFactory(&ConnectionState::createConnectionState) | ||||||
|     { |     { | ||||||
|  |  | ||||||
| @@ -124,9 +126,15 @@ namespace ix | |||||||
|  |  | ||||||
|     void SocketServer::start() |     void SocketServer::start() | ||||||
|     { |     { | ||||||
|         if (_thread.joinable()) return; // we've already been started |         if (!_thread.joinable()) | ||||||
|  |         { | ||||||
|  |             _thread = std::thread(&SocketServer::run, this); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         _thread = std::thread(&SocketServer::run, this); |         if (!_gcThread.joinable()) | ||||||
|  |         { | ||||||
|  |             _gcThread = std::thread(&SocketServer::runGC, this); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     void SocketServer::wait() |     void SocketServer::wait() | ||||||
| @@ -142,21 +150,21 @@ namespace ix | |||||||
|  |  | ||||||
|     void SocketServer::stop() |     void SocketServer::stop() | ||||||
|     { |     { | ||||||
|         while (true) |         // Stop accepting connections, and close the 'accept' thread | ||||||
|  |         if (_thread.joinable()) | ||||||
|         { |         { | ||||||
|             if (closeTerminatedThreads()) break; |             _stop = true; | ||||||
|  |             _thread.join(); | ||||||
|             // wait 10ms and try again later. |             _stop = false; | ||||||
|             // we could have a timeout, but if we exit of here |  | ||||||
|             // we leaked threads, it is quite bad. |  | ||||||
|             std::this_thread::sleep_for(std::chrono::milliseconds(10)); |  | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if (!_thread.joinable()) return; // nothing to do |         // Join all threads and make sure that all connections are terminated | ||||||
|  |         if (_gcThread.joinable()) | ||||||
|         _stop = true; |         { | ||||||
|         _thread.join(); |             _stopGc = true; | ||||||
|         _stop = false; |             _gcThread.join(); | ||||||
|  |             _stopGc = false; | ||||||
|  |         } | ||||||
|  |  | ||||||
|         _conditionVariable.notify_one(); |         _conditionVariable.notify_one(); | ||||||
|         Socket::closeSocket(_serverFd); |         Socket::closeSocket(_serverFd); | ||||||
| @@ -175,7 +183,7 @@ namespace ix | |||||||
|     // field becomes true, and we can use that to know that we can join that thread |     // field becomes true, and we can use that to know that we can join that thread | ||||||
|     // and remove it from our _connectionsThreads data structure (a list). |     // and remove it from our _connectionsThreads data structure (a list). | ||||||
|     // |     // | ||||||
|     bool SocketServer::closeTerminatedThreads() |     void SocketServer::closeTerminatedThreads() | ||||||
|     { |     { | ||||||
|         std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); |         std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); | ||||||
|         auto it = _connectionsThreads.begin(); |         auto it = _connectionsThreads.begin(); | ||||||
| @@ -195,8 +203,6 @@ namespace ix | |||||||
|             if (thread.joinable()) thread.join(); |             if (thread.joinable()) thread.join(); | ||||||
|             it = _connectionsThreads.erase(it); |             it = _connectionsThreads.erase(it); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         return _connectionsThreads.empty(); |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     void SocketServer::run() |     void SocketServer::run() | ||||||
| @@ -208,12 +214,6 @@ namespace ix | |||||||
|         { |         { | ||||||
|             if (_stop) return; |             if (_stop) return; | ||||||
|  |  | ||||||
|             // Garbage collection to shutdown/join threads for closed connections. |  | ||||||
|             // We could run this in its own thread, so that we dont need to accept |  | ||||||
|             // a new connection to close a thread. |  | ||||||
|             // We could also use a condition variable to be notify when we need to do this |  | ||||||
|             closeTerminatedThreads(); |  | ||||||
|  |  | ||||||
|             // Use select to check whether a new connection is in progress |             // Use select to check whether a new connection is in progress | ||||||
|             fd_set rfds; |             fd_set rfds; | ||||||
|             struct timeval timeout; |             struct timeval timeout; | ||||||
| @@ -290,5 +290,30 @@ namespace ix | |||||||
|                                 connectionState))); |                                 connectionState))); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     size_t SocketServer::getConnectionsThreadsCount() | ||||||
|  |     { | ||||||
|  |         std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); | ||||||
|  |         return _connectionsThreads.size(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     void SocketServer::runGC() | ||||||
|  |     { | ||||||
|  |         for (;;) | ||||||
|  |         { | ||||||
|  |             // Garbage collection to shutdown/join threads for closed connections. | ||||||
|  |             closeTerminatedThreads(); | ||||||
|  |  | ||||||
|  |             // We quit this thread if all connections are closed and we received | ||||||
|  |             // a stop request by setting _stopGc to true. | ||||||
|  |             if (_stopGc && getConnectionsThreadsCount() == 0) | ||||||
|  |             { | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             // Sleep a little bit then keep cleaning up | ||||||
|  |             std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -74,6 +74,12 @@ namespace ix | |||||||
|         // background thread to wait for incoming connections |         // background thread to wait for incoming connections | ||||||
|         std::atomic<bool> _stop; |         std::atomic<bool> _stop; | ||||||
|         std::thread _thread; |         std::thread _thread; | ||||||
|  |         void run(); | ||||||
|  |  | ||||||
|  |         // background thread to cleanup (join) terminated threads | ||||||
|  |         std::atomic<bool> _stopGc; | ||||||
|  |         std::thread _gcThread; | ||||||
|  |         void runGC(); | ||||||
|  |  | ||||||
|         // the list of (connectionState, threads) for each connections |         // the list of (connectionState, threads) for each connections | ||||||
|         ConnectionThreads _connectionsThreads; |         ConnectionThreads _connectionsThreads; | ||||||
| @@ -87,13 +93,12 @@ namespace ix | |||||||
|         // the factory to create ConnectionState objects |         // the factory to create ConnectionState objects | ||||||
|         ConnectionStateFactory _connectionStateFactory; |         ConnectionStateFactory _connectionStateFactory; | ||||||
|  |  | ||||||
|         // Methods |  | ||||||
|         void run(); |  | ||||||
|         virtual void handleConnection(int fd, |         virtual void handleConnection(int fd, | ||||||
|                                       std::shared_ptr<ConnectionState> connectionState) = 0; |                                       std::shared_ptr<ConnectionState> connectionState) = 0; | ||||||
|         virtual size_t getConnectedClientsCount() = 0; |         virtual size_t getConnectedClientsCount() = 0; | ||||||
|  |  | ||||||
|         // Returns true if all connection threads are joined |         // Returns true if all connection threads are joined | ||||||
|         bool closeTerminatedThreads(); |         void closeTerminatedThreads(); | ||||||
|  |         size_t getConnectionsThreadsCount(); | ||||||
|     }; |     }; | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user