server code / add dedicated thread to close/join terminated connection threads

This commit is contained in:
Benjamin Sergeant 2019-05-13 12:20:03 -07:00
parent 1023e925f6
commit fc4623381a
2 changed files with 56 additions and 26 deletions

View File

@ -30,7 +30,9 @@ namespace ix
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_serverFd(-1),
_stop(false), _stop(false),
_stopGc(false),
_connectionStateFactory(&ConnectionState::createConnectionState) _connectionStateFactory(&ConnectionState::createConnectionState)
{ {
@ -124,11 +126,17 @@ namespace ix
void SocketServer::start() void SocketServer::start()
{ {
if (_thread.joinable()) return; // we've already been started if (!_thread.joinable())
{
_thread = std::thread(&SocketServer::run, this); _thread = std::thread(&SocketServer::run, this);
} }
if (!_gcThread.joinable())
{
_gcThread = std::thread(&SocketServer::runGC, this);
}
}
void SocketServer::wait() void SocketServer::wait()
{ {
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
@ -142,21 +150,21 @@ namespace ix
void SocketServer::stop() void SocketServer::stop()
{ {
while (true) // Stop accepting connections, and close the 'accept' thread
if (_thread.joinable())
{ {
if (closeTerminatedThreads()) break;
// wait 10ms and try again later.
// we could have a timeout, but if we exit of here
// we leaked threads, it is quite bad.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (!_thread.joinable()) return; // nothing to do
_stop = true; _stop = true;
_thread.join(); _thread.join();
_stop = false; _stop = false;
}
// Join all threads and make sure that all connections are terminated
if (_gcThread.joinable())
{
_stopGc = true;
_gcThread.join();
_stopGc = false;
}
_conditionVariable.notify_one(); _conditionVariable.notify_one();
Socket::closeSocket(_serverFd); Socket::closeSocket(_serverFd);
@ -175,7 +183,7 @@ namespace ix
// field becomes true, and we can use that to know that we can join that thread // field becomes true, and we can use that to know that we can join that thread
// and remove it from our _connectionsThreads data structure (a list). // and remove it from our _connectionsThreads data structure (a list).
// //
bool SocketServer::closeTerminatedThreads() void SocketServer::closeTerminatedThreads()
{ {
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
auto it = _connectionsThreads.begin(); auto it = _connectionsThreads.begin();
@ -195,8 +203,6 @@ namespace ix
if (thread.joinable()) thread.join(); if (thread.joinable()) thread.join();
it = _connectionsThreads.erase(it); it = _connectionsThreads.erase(it);
} }
return _connectionsThreads.empty();
} }
void SocketServer::run() void SocketServer::run()
@ -208,12 +214,6 @@ namespace ix
{ {
if (_stop) return; if (_stop) return;
// Garbage collection to shutdown/join threads for closed connections.
// We could run this in its own thread, so that we dont need to accept
// a new connection to close a thread.
// We could also use a condition variable to be notify when we need to do this
closeTerminatedThreads();
// Use select to check whether a new connection is in progress // Use select to check whether a new connection is in progress
fd_set rfds; fd_set rfds;
struct timeval timeout; struct timeval timeout;
@ -290,5 +290,30 @@ namespace ix
connectionState))); connectionState)));
} }
} }
size_t SocketServer::getConnectionsThreadsCount()
{
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
return _connectionsThreads.size();
}
void SocketServer::runGC()
{
for (;;)
{
// Garbage collection to shutdown/join threads for closed connections.
closeTerminatedThreads();
// We quit this thread if all connections are closed and we received
// a stop request by setting _stopGc to true.
if (_stopGc && getConnectionsThreadsCount() == 0)
{
break;
}
// Sleep a little bit then keep cleaning up
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
} }

View File

@ -74,6 +74,12 @@ namespace ix
// background thread to wait for incoming connections // background thread to wait for incoming connections
std::atomic<bool> _stop; std::atomic<bool> _stop;
std::thread _thread; std::thread _thread;
void run();
// background thread to cleanup (join) terminated threads
std::atomic<bool> _stopGc;
std::thread _gcThread;
void runGC();
// the list of (connectionState, threads) for each connections // the list of (connectionState, threads) for each connections
ConnectionThreads _connectionsThreads; ConnectionThreads _connectionsThreads;
@ -87,13 +93,12 @@ namespace ix
// the factory to create ConnectionState objects // the factory to create ConnectionState objects
ConnectionStateFactory _connectionStateFactory; ConnectionStateFactory _connectionStateFactory;
// Methods
void run();
virtual void handleConnection(int fd, virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0; std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined // Returns true if all connection threads are joined
bool closeTerminatedThreads(); void closeTerminatedThreads();
size_t getConnectionsThreadsCount();
}; };
} }