websocket server: closed connection threads are joined properly

This commit is contained in:
Benjamin Sergeant 2019-04-17 16:23:24 -07:00
parent 4e2e14fb22
commit f96babc6a6
5 changed files with 61 additions and 12 deletions

View File

@ -10,7 +10,7 @@ namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
ConnectionState::ConnectionState() : _terminated(false)
{
computeId();
}
@ -29,5 +29,15 @@ namespace ix
{
return std::make_shared<ConnectionState>();
}
bool ConnectionState::isTerminated() const
{
return _terminated;
}
bool ConnectionState::setTerminated()
{
_terminated = true;
}
}

View File

@ -21,9 +21,13 @@ namespace ix
virtual void computeId();
virtual const std::string& getId() const;
bool setTerminated();
bool isTerminated() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::atomic<bool> _terminated;
std::string _id;
static std::atomic<uint64_t> _globalId;

View File

@ -136,11 +136,8 @@ namespace ix
void SocketServer::stop()
{
for (auto&& thread : _connectionsThreads)
{
if (!thread.joinable()) continue;
thread.join();
}
closeTerminatedThreads();
assert(_connectionsThreads.empty());
if (!_thread.joinable()) return; // nothing to do
@ -158,6 +155,29 @@ namespace ix
_connectionStateFactory = connectionStateFactory;
}
// join the threads for connections that have been closed
void SocketServer::closeTerminatedThreads()
{
auto it = _connectionsThreads.begin();
auto itEnd = _connectionsThreads.end();
while (it != itEnd)
{
auto& connectionState = it->first;
auto& thread = it->second;
if (!connectionState->isTerminated() ||
!thread.joinable())
{
++it;
continue;
}
thread.join();
it = _connectionsThreads.erase(it);
}
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
@ -167,6 +187,12 @@ namespace ix
{
if (_stop) return;
// Garbage collection to shutdown/join threads for closed connections.
// We could run this in its own thread, so that we dont need to accept
// a new connection to close a thread.
// We could also use a condition variable to be notify when we need to do this
closeTerminatedThreads();
// Use select to check whether a new connection is in progress
fd_set rfds;
struct timeval timeout;
@ -231,10 +257,12 @@ namespace ix
}
// Launch the handleConnection work asynchronously in its own thread.
_connectionsThreads.push_back(std::thread(&SocketServer::handleConnection,
this,
clientFd,
connectionState));
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(&SocketServer::handleConnection,
this,
clientFd,
connectionState)));
}
}
}

View File

@ -12,7 +12,7 @@
#include <string>
#include <set>
#include <thread>
#include <vector>
#include <list>
#include <mutex>
#include <functional>
#include <memory>
@ -25,6 +25,10 @@ namespace ix
public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
// We use a list as we only care about remove and append operations.
using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>,
std::thread>>;
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@ -64,7 +68,7 @@ namespace ix
std::atomic<bool> _stop;
std::thread _thread;
std::vector<std::thread> _connectionsThreads;
ConnectionThreads _connectionsThreads;
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
@ -77,5 +81,7 @@ namespace ix
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
void closeTerminatedThreads();
};
}

View File

@ -91,6 +91,7 @@ namespace ix
}
logInfo("WebSocketServer::handleConnection() done");
connectionState->setTerminated();
}
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()