websocket server: closed connection threads are joined properly
This commit is contained in:
parent
bdfc55b951
commit
d486c72e02
@ -10,7 +10,7 @@ namespace ix
|
||||
{
|
||||
std::atomic<uint64_t> ConnectionState::_globalId(0);
|
||||
|
||||
ConnectionState::ConnectionState()
|
||||
ConnectionState::ConnectionState() : _terminated(false)
|
||||
{
|
||||
computeId();
|
||||
}
|
||||
@ -29,5 +29,15 @@ namespace ix
|
||||
{
|
||||
return std::make_shared<ConnectionState>();
|
||||
}
|
||||
|
||||
bool ConnectionState::isTerminated() const
|
||||
{
|
||||
return _terminated;
|
||||
}
|
||||
|
||||
bool ConnectionState::setTerminated()
|
||||
{
|
||||
_terminated = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,9 +21,13 @@ namespace ix
|
||||
virtual void computeId();
|
||||
virtual const std::string& getId() const;
|
||||
|
||||
bool setTerminated();
|
||||
bool isTerminated() const;
|
||||
|
||||
static std::shared_ptr<ConnectionState> createConnectionState();
|
||||
|
||||
protected:
|
||||
std::atomic<bool> _terminated;
|
||||
std::string _id;
|
||||
|
||||
static std::atomic<uint64_t> _globalId;
|
||||
|
@ -136,11 +136,8 @@ namespace ix
|
||||
|
||||
void SocketServer::stop()
|
||||
{
|
||||
for (auto&& thread : _connectionsThreads)
|
||||
{
|
||||
if (!thread.joinable()) continue;
|
||||
thread.join();
|
||||
}
|
||||
closeTerminatedThreads();
|
||||
assert(_connectionsThreads.empty());
|
||||
|
||||
if (!_thread.joinable()) return; // nothing to do
|
||||
|
||||
@ -158,6 +155,29 @@ namespace ix
|
||||
_connectionStateFactory = connectionStateFactory;
|
||||
}
|
||||
|
||||
// join the threads for connections that have been closed
|
||||
void SocketServer::closeTerminatedThreads()
|
||||
{
|
||||
auto it = _connectionsThreads.begin();
|
||||
auto itEnd = _connectionsThreads.end();
|
||||
|
||||
while (it != itEnd)
|
||||
{
|
||||
auto& connectionState = it->first;
|
||||
auto& thread = it->second;
|
||||
|
||||
if (!connectionState->isTerminated() ||
|
||||
!thread.joinable())
|
||||
{
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
thread.join();
|
||||
it = _connectionsThreads.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void SocketServer::run()
|
||||
{
|
||||
// Set the socket to non blocking mode, so that accept calls are not blocking
|
||||
@ -167,6 +187,12 @@ namespace ix
|
||||
{
|
||||
if (_stop) return;
|
||||
|
||||
// Garbage collection to shutdown/join threads for closed connections.
|
||||
// We could run this in its own thread, so that we dont need to accept
|
||||
// a new connection to close a thread.
|
||||
// We could also use a condition variable to be notify when we need to do this
|
||||
closeTerminatedThreads();
|
||||
|
||||
// Use select to check whether a new connection is in progress
|
||||
fd_set rfds;
|
||||
struct timeval timeout;
|
||||
@ -231,10 +257,12 @@ namespace ix
|
||||
}
|
||||
|
||||
// Launch the handleConnection work asynchronously in its own thread.
|
||||
_connectionsThreads.push_back(std::thread(&SocketServer::handleConnection,
|
||||
this,
|
||||
clientFd,
|
||||
connectionState));
|
||||
_connectionsThreads.push_back(std::make_pair(
|
||||
connectionState,
|
||||
std::thread(&SocketServer::handleConnection,
|
||||
this,
|
||||
clientFd,
|
||||
connectionState)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@
|
||||
#include <string>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -25,6 +25,10 @@ namespace ix
|
||||
public:
|
||||
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
|
||||
|
||||
// We use a list as we only care about remove and append operations.
|
||||
using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>,
|
||||
std::thread>>;
|
||||
|
||||
SocketServer(int port = SocketServer::kDefaultPort,
|
||||
const std::string& host = SocketServer::kDefaultHost,
|
||||
int backlog = SocketServer::kDefaultTcpBacklog,
|
||||
@ -64,7 +68,7 @@ namespace ix
|
||||
std::atomic<bool> _stop;
|
||||
std::thread _thread;
|
||||
|
||||
std::vector<std::thread> _connectionsThreads;
|
||||
ConnectionThreads _connectionsThreads;
|
||||
|
||||
std::condition_variable _conditionVariable;
|
||||
std::mutex _conditionVariableMutex;
|
||||
@ -77,5 +81,7 @@ namespace ix
|
||||
virtual void handleConnection(int fd,
|
||||
std::shared_ptr<ConnectionState> connectionState) = 0;
|
||||
virtual size_t getConnectedClientsCount() = 0;
|
||||
|
||||
void closeTerminatedThreads();
|
||||
};
|
||||
}
|
||||
|
@ -91,6 +91,7 @@ namespace ix
|
||||
}
|
||||
|
||||
logInfo("WebSocketServer::handleConnection() done");
|
||||
connectionState->setTerminated();
|
||||
}
|
||||
|
||||
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()
|
||||
|
Loading…
Reference in New Issue
Block a user