websocket server: closed connection threads are joined properly

This commit is contained in:
Benjamin Sergeant 2019-04-17 16:23:24 -07:00
parent 66440e2330
commit b1c1e6e28d
5 changed files with 61 additions and 12 deletions

View File

@ -10,7 +10,7 @@ namespace ix
{ {
std::atomic<uint64_t> ConnectionState::_globalId(0); std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState() ConnectionState::ConnectionState() : _terminated(false)
{ {
computeId(); computeId();
} }
@ -29,5 +29,15 @@ namespace ix
{ {
return std::make_shared<ConnectionState>(); return std::make_shared<ConnectionState>();
} }
bool ConnectionState::isTerminated() const
{
return _terminated;
}
bool ConnectionState::setTerminated()
{
_terminated = true;
}
} }

View File

@ -21,9 +21,13 @@ namespace ix
virtual void computeId(); virtual void computeId();
virtual const std::string& getId() const; virtual const std::string& getId() const;
bool setTerminated();
bool isTerminated() const;
static std::shared_ptr<ConnectionState> createConnectionState(); static std::shared_ptr<ConnectionState> createConnectionState();
protected: protected:
std::atomic<bool> _terminated;
std::string _id; std::string _id;
static std::atomic<uint64_t> _globalId; static std::atomic<uint64_t> _globalId;

View File

@ -136,11 +136,8 @@ namespace ix
void SocketServer::stop() void SocketServer::stop()
{ {
for (auto&& thread : _connectionsThreads) closeTerminatedThreads();
{ assert(_connectionsThreads.empty());
if (!thread.joinable()) continue;
thread.join();
}
if (!_thread.joinable()) return; // nothing to do if (!_thread.joinable()) return; // nothing to do
@ -158,6 +155,29 @@ namespace ix
_connectionStateFactory = connectionStateFactory; _connectionStateFactory = connectionStateFactory;
} }
// join the threads for connections that have been closed
void SocketServer::closeTerminatedThreads()
{
auto it = _connectionsThreads.begin();
auto itEnd = _connectionsThreads.end();
while (it != itEnd)
{
auto& connectionState = it->first;
auto& thread = it->second;
if (!connectionState->isTerminated() ||
!thread.joinable())
{
++it;
continue;
}
thread.join();
it = _connectionsThreads.erase(it);
}
}
void SocketServer::run() void SocketServer::run()
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
@ -167,6 +187,12 @@ namespace ix
{ {
if (_stop) return; if (_stop) return;
// Garbage collection to shutdown/join threads for closed connections.
// We could run this in its own thread, so that we dont need to accept
// a new connection to close a thread.
// We could also use a condition variable to be notify when we need to do this
closeTerminatedThreads();
// Use select to check whether a new connection is in progress // Use select to check whether a new connection is in progress
fd_set rfds; fd_set rfds;
struct timeval timeout; struct timeval timeout;
@ -231,10 +257,12 @@ namespace ix
} }
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
_connectionsThreads.push_back(std::thread(&SocketServer::handleConnection, _connectionsThreads.push_back(std::make_pair(
this, connectionState,
clientFd, std::thread(&SocketServer::handleConnection,
connectionState)); this,
clientFd,
connectionState)));
} }
} }
} }

View File

@ -12,7 +12,7 @@
#include <string> #include <string>
#include <set> #include <set>
#include <thread> #include <thread>
#include <vector> #include <list>
#include <mutex> #include <mutex>
#include <functional> #include <functional>
#include <memory> #include <memory>
@ -25,6 +25,10 @@ namespace ix
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>; using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
// We use a list as we only care about remove and append operations.
using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>,
std::thread>>;
SocketServer(int port = SocketServer::kDefaultPort, SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
@ -64,7 +68,7 @@ namespace ix
std::atomic<bool> _stop; std::atomic<bool> _stop;
std::thread _thread; std::thread _thread;
std::vector<std::thread> _connectionsThreads; ConnectionThreads _connectionsThreads;
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
@ -77,5 +81,7 @@ namespace ix
virtual void handleConnection(int fd, virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0; std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
void closeTerminatedThreads();
}; };
} }

View File

@ -91,6 +91,7 @@ namespace ix
} }
logInfo("WebSocketServer::handleConnection() done"); logInfo("WebSocketServer::handleConnection() done");
connectionState->setTerminated();
} }
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients() std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()