Compare commits
	
		
			3 Commits
		
	
	
		
			v5.1.5
			...
			bug/30_ser
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					b1c1e6e28d | ||
| 
						 | 
					66440e2330 | ||
| 
						 | 
					792610d44f | 
@@ -10,7 +10,7 @@ namespace ix
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
    std::atomic<uint64_t> ConnectionState::_globalId(0);
 | 
					    std::atomic<uint64_t> ConnectionState::_globalId(0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ConnectionState::ConnectionState()
 | 
					    ConnectionState::ConnectionState() : _terminated(false)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        computeId();
 | 
					        computeId();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -29,5 +29,15 @@ namespace ix
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        return std::make_shared<ConnectionState>();
 | 
					        return std::make_shared<ConnectionState>();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool ConnectionState::isTerminated() const
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return _terminated;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool ConnectionState::setTerminated()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        _terminated = true;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,9 +21,13 @@ namespace ix
 | 
				
			|||||||
        virtual void computeId();
 | 
					        virtual void computeId();
 | 
				
			||||||
        virtual const std::string& getId() const;
 | 
					        virtual const std::string& getId() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        bool setTerminated();
 | 
				
			||||||
 | 
					        bool isTerminated() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        static std::shared_ptr<ConnectionState> createConnectionState();
 | 
					        static std::shared_ptr<ConnectionState> createConnectionState();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    protected:
 | 
					    protected:
 | 
				
			||||||
 | 
					        std::atomic<bool> _terminated;
 | 
				
			||||||
        std::string _id;
 | 
					        std::string _id;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        static std::atomic<uint64_t> _globalId;
 | 
					        static std::atomic<uint64_t> _globalId;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -136,6 +136,9 @@ namespace ix
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    void SocketServer::stop()
 | 
					    void SocketServer::stop()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
 | 
					        closeTerminatedThreads();
 | 
				
			||||||
 | 
					        assert(_connectionsThreads.empty());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (!_thread.joinable()) return; // nothing to do
 | 
					        if (!_thread.joinable()) return; // nothing to do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        _stop = true;
 | 
					        _stop = true;
 | 
				
			||||||
@@ -152,18 +155,44 @@ namespace ix
 | 
				
			|||||||
        _connectionStateFactory = connectionStateFactory;
 | 
					        _connectionStateFactory = connectionStateFactory;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // join the threads for connections that have been closed
 | 
				
			||||||
 | 
					    void SocketServer::closeTerminatedThreads()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        auto it = _connectionsThreads.begin();
 | 
				
			||||||
 | 
					        auto itEnd  = _connectionsThreads.end();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        while (it != itEnd)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            auto& connectionState = it->first;
 | 
				
			||||||
 | 
					            auto& thread = it->second;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if (!connectionState->isTerminated() ||
 | 
				
			||||||
 | 
					                !thread.joinable())
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                ++it;
 | 
				
			||||||
 | 
					                continue;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            thread.join();
 | 
				
			||||||
 | 
					            it = _connectionsThreads.erase(it);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void SocketServer::run()
 | 
					    void SocketServer::run()
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        // Set the socket to non blocking mode, so that accept calls are not blocking
 | 
					        // Set the socket to non blocking mode, so that accept calls are not blocking
 | 
				
			||||||
        SocketConnect::configure(_serverFd);
 | 
					        SocketConnect::configure(_serverFd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Return value of std::async, ignored
 | 
					 | 
				
			||||||
        std::future<void> f;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for (;;)
 | 
					        for (;;)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            if (_stop) return;
 | 
					            if (_stop) return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // Garbage collection to shutdown/join threads for closed connections.
 | 
				
			||||||
 | 
					            // We could run this in its own thread, so that we dont need to accept 
 | 
				
			||||||
 | 
					            // a new connection to close a thread.
 | 
				
			||||||
 | 
					            // We could also use a condition variable to be notify when we need to do this
 | 
				
			||||||
 | 
					            closeTerminatedThreads();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // Use select to check whether a new connection is in progress
 | 
					            // Use select to check whether a new connection is in progress
 | 
				
			||||||
            fd_set rfds;
 | 
					            fd_set rfds;
 | 
				
			||||||
            struct timeval timeout;
 | 
					            struct timeval timeout;
 | 
				
			||||||
@@ -228,14 +257,12 @@ namespace ix
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // Launch the handleConnection work asynchronously in its own thread.
 | 
					            // Launch the handleConnection work asynchronously in its own thread.
 | 
				
			||||||
            //
 | 
					            _connectionsThreads.push_back(std::make_pair(
 | 
				
			||||||
            // the destructor of a future returned by std::async blocks,
 | 
					                    connectionState,
 | 
				
			||||||
            // so we need to declare it outside of this loop
 | 
					                    std::thread(&SocketServer::handleConnection,
 | 
				
			||||||
            f = std::async(std::launch::async,
 | 
					 | 
				
			||||||
                           &SocketServer::handleConnection,
 | 
					 | 
				
			||||||
                                this,
 | 
					                                this,
 | 
				
			||||||
                                clientFd,
 | 
					                                clientFd,
 | 
				
			||||||
                           connectionState);
 | 
					                                connectionState)));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,6 +12,7 @@
 | 
				
			|||||||
#include <string>
 | 
					#include <string>
 | 
				
			||||||
#include <set>
 | 
					#include <set>
 | 
				
			||||||
#include <thread>
 | 
					#include <thread>
 | 
				
			||||||
 | 
					#include <list>
 | 
				
			||||||
#include <mutex>
 | 
					#include <mutex>
 | 
				
			||||||
#include <functional>
 | 
					#include <functional>
 | 
				
			||||||
#include <memory>
 | 
					#include <memory>
 | 
				
			||||||
@@ -24,6 +25,10 @@ namespace ix
 | 
				
			|||||||
    public:
 | 
					    public:
 | 
				
			||||||
        using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
 | 
					        using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // We use a list as we only care about remove and append operations.
 | 
				
			||||||
 | 
					        using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>,
 | 
				
			||||||
 | 
					                                                      std::thread>>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        SocketServer(int port = SocketServer::kDefaultPort,
 | 
					        SocketServer(int port = SocketServer::kDefaultPort,
 | 
				
			||||||
                     const std::string& host = SocketServer::kDefaultHost,
 | 
					                     const std::string& host = SocketServer::kDefaultHost,
 | 
				
			||||||
                     int backlog = SocketServer::kDefaultTcpBacklog,
 | 
					                     int backlog = SocketServer::kDefaultTcpBacklog,
 | 
				
			||||||
@@ -63,6 +68,8 @@ namespace ix
 | 
				
			|||||||
        std::atomic<bool> _stop;
 | 
					        std::atomic<bool> _stop;
 | 
				
			||||||
        std::thread _thread;
 | 
					        std::thread _thread;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        ConnectionThreads _connectionsThreads;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        std::condition_variable _conditionVariable;
 | 
					        std::condition_variable _conditionVariable;
 | 
				
			||||||
        std::mutex _conditionVariableMutex;
 | 
					        std::mutex _conditionVariableMutex;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -74,5 +81,7 @@ namespace ix
 | 
				
			|||||||
        virtual void handleConnection(int fd,
 | 
					        virtual void handleConnection(int fd,
 | 
				
			||||||
                                      std::shared_ptr<ConnectionState> connectionState) = 0;
 | 
					                                      std::shared_ptr<ConnectionState> connectionState) = 0;
 | 
				
			||||||
        virtual size_t getConnectedClientsCount() = 0;
 | 
					        virtual size_t getConnectedClientsCount() = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        void closeTerminatedThreads();
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -91,6 +91,7 @@ namespace ix
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logInfo("WebSocketServer::handleConnection() done");
 | 
					        logInfo("WebSocketServer::handleConnection() done");
 | 
				
			||||||
 | 
					        connectionState->setTerminated();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()
 | 
					    std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user