From b1c1e6e28db28e94797be93535bf40f1fc049ac2 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant <“bsergean@gmail.com”> Date: Wed, 17 Apr 2019 16:23:24 -0700 Subject: [PATCH] websocket server: closed connection threads are joined properly --- ixwebsocket/IXConnectionState.cpp | 12 +++++++- ixwebsocket/IXConnectionState.h | 4 +++ ixwebsocket/IXSocketServer.cpp | 46 +++++++++++++++++++++++++------ ixwebsocket/IXSocketServer.h | 10 +++++-- ixwebsocket/IXWebSocketServer.cpp | 1 + 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/ixwebsocket/IXConnectionState.cpp b/ixwebsocket/IXConnectionState.cpp index 73449020..717d7485 100644 --- a/ixwebsocket/IXConnectionState.cpp +++ b/ixwebsocket/IXConnectionState.cpp @@ -10,7 +10,7 @@ namespace ix { std::atomic ConnectionState::_globalId(0); - ConnectionState::ConnectionState() + ConnectionState::ConnectionState() : _terminated(false) { computeId(); } @@ -29,5 +29,15 @@ namespace ix { return std::make_shared(); } + + bool ConnectionState::isTerminated() const + { + return _terminated; + } + + bool ConnectionState::setTerminated() + { + _terminated = true; + } } diff --git a/ixwebsocket/IXConnectionState.h b/ixwebsocket/IXConnectionState.h index 0c5d9920..d76df5d9 100644 --- a/ixwebsocket/IXConnectionState.h +++ b/ixwebsocket/IXConnectionState.h @@ -21,9 +21,13 @@ namespace ix virtual void computeId(); virtual const std::string& getId() const; + bool setTerminated(); + bool isTerminated() const; + static std::shared_ptr createConnectionState(); protected: + std::atomic _terminated; std::string _id; static std::atomic _globalId; diff --git a/ixwebsocket/IXSocketServer.cpp b/ixwebsocket/IXSocketServer.cpp index f03416f9..413be77e 100644 --- a/ixwebsocket/IXSocketServer.cpp +++ b/ixwebsocket/IXSocketServer.cpp @@ -136,11 +136,8 @@ namespace ix void SocketServer::stop() { - for (auto&& thread : _connectionsThreads) - { - if (!thread.joinable()) continue; - thread.join(); - } + closeTerminatedThreads(); + assert(_connectionsThreads.empty()); if (!_thread.joinable()) return; // nothing to do @@ -158,6 +155,29 @@ namespace ix _connectionStateFactory = connectionStateFactory; } + // join the threads for connections that have been closed + void SocketServer::closeTerminatedThreads() + { + auto it = _connectionsThreads.begin(); + auto itEnd = _connectionsThreads.end(); + + while (it != itEnd) + { + auto& connectionState = it->first; + auto& thread = it->second; + + if (!connectionState->isTerminated() || + !thread.joinable()) + { + ++it; + continue; + } + + thread.join(); + it = _connectionsThreads.erase(it); + } + } + void SocketServer::run() { // Set the socket to non blocking mode, so that accept calls are not blocking @@ -167,6 +187,12 @@ namespace ix { if (_stop) return; + // Garbage collection to shutdown/join threads for closed connections. + // We could run this in its own thread, so that we dont need to accept + // a new connection to close a thread. + // We could also use a condition variable to be notify when we need to do this + closeTerminatedThreads(); + // Use select to check whether a new connection is in progress fd_set rfds; struct timeval timeout; @@ -231,10 +257,12 @@ namespace ix } // Launch the handleConnection work asynchronously in its own thread. - _connectionsThreads.push_back(std::thread(&SocketServer::handleConnection, - this, - clientFd, - connectionState)); + _connectionsThreads.push_back(std::make_pair( + connectionState, + std::thread(&SocketServer::handleConnection, + this, + clientFd, + connectionState))); } } } diff --git a/ixwebsocket/IXSocketServer.h b/ixwebsocket/IXSocketServer.h index 39597bf7..fc8f3a6a 100644 --- a/ixwebsocket/IXSocketServer.h +++ b/ixwebsocket/IXSocketServer.h @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include @@ -25,6 +25,10 @@ namespace ix public: using ConnectionStateFactory = std::function()>; + // We use a list as we only care about remove and append operations. + using ConnectionThreads = std::list, + std::thread>>; + SocketServer(int port = SocketServer::kDefaultPort, const std::string& host = SocketServer::kDefaultHost, int backlog = SocketServer::kDefaultTcpBacklog, @@ -64,7 +68,7 @@ namespace ix std::atomic _stop; std::thread _thread; - std::vector _connectionsThreads; + ConnectionThreads _connectionsThreads; std::condition_variable _conditionVariable; std::mutex _conditionVariableMutex; @@ -77,5 +81,7 @@ namespace ix virtual void handleConnection(int fd, std::shared_ptr connectionState) = 0; virtual size_t getConnectedClientsCount() = 0; + + void closeTerminatedThreads(); }; } diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index ffffd09d..c453134d 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -91,6 +91,7 @@ namespace ix } logInfo("WebSocketServer::handleConnection() done"); + connectionState->setTerminated(); } std::set> WebSocketServer::getClients()