server code / add dedicated thread to close/join terminated connection threads
This commit is contained in:
		@@ -30,7 +30,9 @@ namespace ix
 | 
				
			|||||||
        _host(host),
 | 
					        _host(host),
 | 
				
			||||||
        _backlog(backlog),
 | 
					        _backlog(backlog),
 | 
				
			||||||
        _maxConnections(maxConnections),
 | 
					        _maxConnections(maxConnections),
 | 
				
			||||||
 | 
					        _serverFd(-1),
 | 
				
			||||||
        _stop(false),
 | 
					        _stop(false),
 | 
				
			||||||
 | 
					        _stopGc(false),
 | 
				
			||||||
        _connectionStateFactory(&ConnectionState::createConnectionState)
 | 
					        _connectionStateFactory(&ConnectionState::createConnectionState)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -124,9 +126,15 @@ namespace ix
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    void SocketServer::start()
 | 
					    void SocketServer::start()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        if (_thread.joinable()) return; // we've already been started
 | 
					        if (!_thread.joinable())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            _thread = std::thread(&SocketServer::run, this);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        _thread = std::thread(&SocketServer::run, this);
 | 
					        if (!_gcThread.joinable())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            _gcThread = std::thread(&SocketServer::runGC, this);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void SocketServer::wait()
 | 
					    void SocketServer::wait()
 | 
				
			||||||
@@ -142,21 +150,21 @@ namespace ix
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    void SocketServer::stop()
 | 
					    void SocketServer::stop()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        while (true)
 | 
					        // Stop accepting connections, and close the 'accept' thread
 | 
				
			||||||
 | 
					        if (_thread.joinable())
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            if (closeTerminatedThreads()) break;
 | 
					            _stop = true;
 | 
				
			||||||
 | 
					            _thread.join();
 | 
				
			||||||
            // wait 10ms and try again later.
 | 
					            _stop = false;
 | 
				
			||||||
            // we could have a timeout, but if we exit of here
 | 
					 | 
				
			||||||
            // we leaked threads, it is quite bad.
 | 
					 | 
				
			||||||
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (!_thread.joinable()) return; // nothing to do
 | 
					        // Join all threads and make sure that all connections are terminated
 | 
				
			||||||
 | 
					        if (_gcThread.joinable())
 | 
				
			||||||
        _stop = true;
 | 
					        {
 | 
				
			||||||
        _thread.join();
 | 
					            _stopGc = true;
 | 
				
			||||||
        _stop = false;
 | 
					            _gcThread.join();
 | 
				
			||||||
 | 
					            _stopGc = false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        _conditionVariable.notify_one();
 | 
					        _conditionVariable.notify_one();
 | 
				
			||||||
        Socket::closeSocket(_serverFd);
 | 
					        Socket::closeSocket(_serverFd);
 | 
				
			||||||
@@ -175,7 +183,7 @@ namespace ix
 | 
				
			|||||||
    // field becomes true, and we can use that to know that we can join that thread
 | 
					    // field becomes true, and we can use that to know that we can join that thread
 | 
				
			||||||
    // and remove it from our _connectionsThreads data structure (a list).
 | 
					    // and remove it from our _connectionsThreads data structure (a list).
 | 
				
			||||||
    //
 | 
					    //
 | 
				
			||||||
    bool SocketServer::closeTerminatedThreads()
 | 
					    void SocketServer::closeTerminatedThreads()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
 | 
					        std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
 | 
				
			||||||
        auto it = _connectionsThreads.begin();
 | 
					        auto it = _connectionsThreads.begin();
 | 
				
			||||||
@@ -195,8 +203,6 @@ namespace ix
 | 
				
			|||||||
            if (thread.joinable()) thread.join();
 | 
					            if (thread.joinable()) thread.join();
 | 
				
			||||||
            it = _connectionsThreads.erase(it);
 | 
					            it = _connectionsThreads.erase(it);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					 | 
				
			||||||
        return _connectionsThreads.empty();
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void SocketServer::run()
 | 
					    void SocketServer::run()
 | 
				
			||||||
@@ -208,12 +214,6 @@ namespace ix
 | 
				
			|||||||
        {
 | 
					        {
 | 
				
			||||||
            if (_stop) return;
 | 
					            if (_stop) return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // Garbage collection to shutdown/join threads for closed connections.
 | 
					 | 
				
			||||||
            // We could run this in its own thread, so that we dont need to accept
 | 
					 | 
				
			||||||
            // a new connection to close a thread.
 | 
					 | 
				
			||||||
            // We could also use a condition variable to be notify when we need to do this
 | 
					 | 
				
			||||||
            closeTerminatedThreads();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            // Use select to check whether a new connection is in progress
 | 
					            // Use select to check whether a new connection is in progress
 | 
				
			||||||
            fd_set rfds;
 | 
					            fd_set rfds;
 | 
				
			||||||
            struct timeval timeout;
 | 
					            struct timeval timeout;
 | 
				
			||||||
@@ -290,5 +290,30 @@ namespace ix
 | 
				
			|||||||
                                connectionState)));
 | 
					                                connectionState)));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    size_t SocketServer::getConnectionsThreadsCount()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
 | 
				
			||||||
 | 
					        return _connectionsThreads.size();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    void SocketServer::runGC()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        for (;;)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            // Garbage collection to shutdown/join threads for closed connections.
 | 
				
			||||||
 | 
					            closeTerminatedThreads();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // We quit this thread if all connections are closed and we received
 | 
				
			||||||
 | 
					            // a stop request by setting _stopGc to true.
 | 
				
			||||||
 | 
					            if (_stopGc && getConnectionsThreadsCount() == 0)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // Sleep a little bit then keep cleaning up
 | 
				
			||||||
 | 
					            std::this_thread::sleep_for(std::chrono::milliseconds(10));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -74,6 +74,12 @@ namespace ix
 | 
				
			|||||||
        // background thread to wait for incoming connections
 | 
					        // background thread to wait for incoming connections
 | 
				
			||||||
        std::atomic<bool> _stop;
 | 
					        std::atomic<bool> _stop;
 | 
				
			||||||
        std::thread _thread;
 | 
					        std::thread _thread;
 | 
				
			||||||
 | 
					        void run();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // background thread to cleanup (join) terminated threads
 | 
				
			||||||
 | 
					        std::atomic<bool> _stopGc;
 | 
				
			||||||
 | 
					        std::thread _gcThread;
 | 
				
			||||||
 | 
					        void runGC();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // the list of (connectionState, threads) for each connections
 | 
					        // the list of (connectionState, threads) for each connections
 | 
				
			||||||
        ConnectionThreads _connectionsThreads;
 | 
					        ConnectionThreads _connectionsThreads;
 | 
				
			||||||
@@ -87,13 +93,12 @@ namespace ix
 | 
				
			|||||||
        // the factory to create ConnectionState objects
 | 
					        // the factory to create ConnectionState objects
 | 
				
			||||||
        ConnectionStateFactory _connectionStateFactory;
 | 
					        ConnectionStateFactory _connectionStateFactory;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Methods
 | 
					 | 
				
			||||||
        void run();
 | 
					 | 
				
			||||||
        virtual void handleConnection(int fd,
 | 
					        virtual void handleConnection(int fd,
 | 
				
			||||||
                                      std::shared_ptr<ConnectionState> connectionState) = 0;
 | 
					                                      std::shared_ptr<ConnectionState> connectionState) = 0;
 | 
				
			||||||
        virtual size_t getConnectedClientsCount() = 0;
 | 
					        virtual size_t getConnectedClientsCount() = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Returns true if all connection threads are joined
 | 
					        // Returns true if all connection threads are joined
 | 
				
			||||||
        bool closeTerminatedThreads();
 | 
					        void closeTerminatedThreads();
 | 
				
			||||||
 | 
					        size_t getConnectionsThreadsCount();
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user