diff --git a/ixwebsocket/IXSocketServer.cpp b/ixwebsocket/IXSocketServer.cpp index 31da31c2..a941371c 100644 --- a/ixwebsocket/IXSocketServer.cpp +++ b/ixwebsocket/IXSocketServer.cpp @@ -30,7 +30,9 @@ namespace ix _host(host), _backlog(backlog), _maxConnections(maxConnections), + _serverFd(-1), _stop(false), + _stopGc(false), _connectionStateFactory(&ConnectionState::createConnectionState) { @@ -124,9 +126,15 @@ namespace ix void SocketServer::start() { - if (_thread.joinable()) return; // we've already been started + if (!_thread.joinable()) + { + _thread = std::thread(&SocketServer::run, this); + } - _thread = std::thread(&SocketServer::run, this); + if (!_gcThread.joinable()) + { + _gcThread = std::thread(&SocketServer::runGC, this); + } } void SocketServer::wait() @@ -142,21 +150,21 @@ namespace ix void SocketServer::stop() { - while (true) + // Stop accepting connections, and close the 'accept' thread + if (_thread.joinable()) { - if (closeTerminatedThreads()) break; - - // wait 10ms and try again later. - // we could have a timeout, but if we exit of here - // we leaked threads, it is quite bad. - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + _stop = true; + _thread.join(); + _stop = false; } - if (!_thread.joinable()) return; // nothing to do - - _stop = true; - _thread.join(); - _stop = false; + // Join all threads and make sure that all connections are terminated + if (_gcThread.joinable()) + { + _stopGc = true; + _gcThread.join(); + _stopGc = false; + } _conditionVariable.notify_one(); Socket::closeSocket(_serverFd); @@ -175,7 +183,7 @@ namespace ix // field becomes true, and we can use that to know that we can join that thread // and remove it from our _connectionsThreads data structure (a list). // - bool SocketServer::closeTerminatedThreads() + void SocketServer::closeTerminatedThreads() { std::lock_guard lock(_connectionsThreadsMutex); auto it = _connectionsThreads.begin(); @@ -195,8 +203,6 @@ namespace ix if (thread.joinable()) thread.join(); it = _connectionsThreads.erase(it); } - - return _connectionsThreads.empty(); } void SocketServer::run() @@ -208,12 +214,6 @@ namespace ix { if (_stop) return; - // Garbage collection to shutdown/join threads for closed connections. - // We could run this in its own thread, so that we dont need to accept - // a new connection to close a thread. - // We could also use a condition variable to be notify when we need to do this - closeTerminatedThreads(); - // Use select to check whether a new connection is in progress fd_set rfds; struct timeval timeout; @@ -290,5 +290,30 @@ namespace ix connectionState))); } } + + size_t SocketServer::getConnectionsThreadsCount() + { + std::lock_guard lock(_connectionsThreadsMutex); + return _connectionsThreads.size(); + } + + void SocketServer::runGC() + { + for (;;) + { + // Garbage collection to shutdown/join threads for closed connections. + closeTerminatedThreads(); + + // We quit this thread if all connections are closed and we received + // a stop request by setting _stopGc to true. + if (_stopGc && getConnectionsThreadsCount() == 0) + { + break; + } + + // Sleep a little bit then keep cleaning up + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } } diff --git a/ixwebsocket/IXSocketServer.h b/ixwebsocket/IXSocketServer.h index d5dfb492..0472e93c 100644 --- a/ixwebsocket/IXSocketServer.h +++ b/ixwebsocket/IXSocketServer.h @@ -74,6 +74,12 @@ namespace ix // background thread to wait for incoming connections std::atomic _stop; std::thread _thread; + void run(); + + // background thread to cleanup (join) terminated threads + std::atomic _stopGc; + std::thread _gcThread; + void runGC(); // the list of (connectionState, threads) for each connections ConnectionThreads _connectionsThreads; @@ -87,13 +93,12 @@ namespace ix // the factory to create ConnectionState objects ConnectionStateFactory _connectionStateFactory; - // Methods - void run(); virtual void handleConnection(int fd, std::shared_ptr connectionState) = 0; virtual size_t getConnectedClientsCount() = 0; // Returns true if all connection threads are joined - bool closeTerminatedThreads(); + void closeTerminatedThreads(); + size_t getConnectionsThreadsCount(); }; }