Compare commits
	
		
			3 Commits
		
	
	
		
			v7.8.1
			...
			bug/30_ser
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | b1c1e6e28d | ||
|  | 66440e2330 | ||
|  | 792610d44f | 
| @@ -10,7 +10,7 @@ namespace ix | |||||||
| { | { | ||||||
|     std::atomic<uint64_t> ConnectionState::_globalId(0); |     std::atomic<uint64_t> ConnectionState::_globalId(0); | ||||||
|  |  | ||||||
|     ConnectionState::ConnectionState() |     ConnectionState::ConnectionState() : _terminated(false) | ||||||
|     { |     { | ||||||
|         computeId(); |         computeId(); | ||||||
|     } |     } | ||||||
| @@ -29,5 +29,15 @@ namespace ix | |||||||
|     { |     { | ||||||
|         return std::make_shared<ConnectionState>(); |         return std::make_shared<ConnectionState>(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     bool ConnectionState::isTerminated() const | ||||||
|  |     { | ||||||
|  |         return _terminated; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     bool ConnectionState::setTerminated() | ||||||
|  |     { | ||||||
|  |         _terminated = true; | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,9 +21,13 @@ namespace ix | |||||||
|         virtual void computeId(); |         virtual void computeId(); | ||||||
|         virtual const std::string& getId() const; |         virtual const std::string& getId() const; | ||||||
|  |  | ||||||
|  |         bool setTerminated(); | ||||||
|  |         bool isTerminated() const; | ||||||
|  |  | ||||||
|         static std::shared_ptr<ConnectionState> createConnectionState(); |         static std::shared_ptr<ConnectionState> createConnectionState(); | ||||||
|  |  | ||||||
|     protected: |     protected: | ||||||
|  |         std::atomic<bool> _terminated; | ||||||
|         std::string _id; |         std::string _id; | ||||||
|  |  | ||||||
|         static std::atomic<uint64_t> _globalId; |         static std::atomic<uint64_t> _globalId; | ||||||
|   | |||||||
| @@ -136,6 +136,9 @@ namespace ix | |||||||
|  |  | ||||||
|     void SocketServer::stop() |     void SocketServer::stop() | ||||||
|     { |     { | ||||||
|  |         closeTerminatedThreads(); | ||||||
|  |         assert(_connectionsThreads.empty()); | ||||||
|  |  | ||||||
|         if (!_thread.joinable()) return; // nothing to do |         if (!_thread.joinable()) return; // nothing to do | ||||||
|  |  | ||||||
|         _stop = true; |         _stop = true; | ||||||
| @@ -152,18 +155,44 @@ namespace ix | |||||||
|         _connectionStateFactory = connectionStateFactory; |         _connectionStateFactory = connectionStateFactory; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     // join the threads for connections that have been closed | ||||||
|  |     void SocketServer::closeTerminatedThreads() | ||||||
|  |     { | ||||||
|  |         auto it = _connectionsThreads.begin(); | ||||||
|  |         auto itEnd  = _connectionsThreads.end(); | ||||||
|  |  | ||||||
|  |         while (it != itEnd) | ||||||
|  |         { | ||||||
|  |             auto& connectionState = it->first; | ||||||
|  |             auto& thread = it->second; | ||||||
|  |  | ||||||
|  |             if (!connectionState->isTerminated() || | ||||||
|  |                 !thread.joinable()) | ||||||
|  |             { | ||||||
|  |                 ++it; | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             thread.join(); | ||||||
|  |             it = _connectionsThreads.erase(it); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     void SocketServer::run() |     void SocketServer::run() | ||||||
|     { |     { | ||||||
|         // Set the socket to non blocking mode, so that accept calls are not blocking |         // Set the socket to non blocking mode, so that accept calls are not blocking | ||||||
|         SocketConnect::configure(_serverFd); |         SocketConnect::configure(_serverFd); | ||||||
|  |  | ||||||
|         // Return value of std::async, ignored |  | ||||||
|         std::future<void> f; |  | ||||||
|  |  | ||||||
|         for (;;) |         for (;;) | ||||||
|         { |         { | ||||||
|             if (_stop) return; |             if (_stop) return; | ||||||
|  |  | ||||||
|  |             // Garbage collection to shutdown/join threads for closed connections. | ||||||
|  |             // We could run this in its own thread, so that we dont need to accept  | ||||||
|  |             // a new connection to close a thread. | ||||||
|  |             // We could also use a condition variable to be notify when we need to do this | ||||||
|  |             closeTerminatedThreads(); | ||||||
|  |  | ||||||
|             // Use select to check whether a new connection is in progress |             // Use select to check whether a new connection is in progress | ||||||
|             fd_set rfds; |             fd_set rfds; | ||||||
|             struct timeval timeout; |             struct timeval timeout; | ||||||
| @@ -228,14 +257,12 @@ namespace ix | |||||||
|             } |             } | ||||||
|  |  | ||||||
|             // Launch the handleConnection work asynchronously in its own thread. |             // Launch the handleConnection work asynchronously in its own thread. | ||||||
|             // |             _connectionsThreads.push_back(std::make_pair( | ||||||
|             // the destructor of a future returned by std::async blocks, |                     connectionState, | ||||||
|             // so we need to declare it outside of this loop |                     std::thread(&SocketServer::handleConnection, | ||||||
|             f = std::async(std::launch::async, |                                 this, | ||||||
|                            &SocketServer::handleConnection, |                                 clientFd, | ||||||
|                            this, |                                 connectionState))); | ||||||
|                            clientFd, |  | ||||||
|                            connectionState); |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ | |||||||
| #include <string> | #include <string> | ||||||
| #include <set> | #include <set> | ||||||
| #include <thread> | #include <thread> | ||||||
|  | #include <list> | ||||||
| #include <mutex> | #include <mutex> | ||||||
| #include <functional> | #include <functional> | ||||||
| #include <memory> | #include <memory> | ||||||
| @@ -24,6 +25,10 @@ namespace ix | |||||||
|     public: |     public: | ||||||
|         using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>; |         using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>; | ||||||
|  |  | ||||||
|  |         // We use a list as we only care about remove and append operations. | ||||||
|  |         using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>, | ||||||
|  |                                                       std::thread>>; | ||||||
|  |  | ||||||
|         SocketServer(int port = SocketServer::kDefaultPort, |         SocketServer(int port = SocketServer::kDefaultPort, | ||||||
|                      const std::string& host = SocketServer::kDefaultHost, |                      const std::string& host = SocketServer::kDefaultHost, | ||||||
|                      int backlog = SocketServer::kDefaultTcpBacklog, |                      int backlog = SocketServer::kDefaultTcpBacklog, | ||||||
| @@ -63,6 +68,8 @@ namespace ix | |||||||
|         std::atomic<bool> _stop; |         std::atomic<bool> _stop; | ||||||
|         std::thread _thread; |         std::thread _thread; | ||||||
|  |  | ||||||
|  |         ConnectionThreads _connectionsThreads; | ||||||
|  |  | ||||||
|         std::condition_variable _conditionVariable; |         std::condition_variable _conditionVariable; | ||||||
|         std::mutex _conditionVariableMutex; |         std::mutex _conditionVariableMutex; | ||||||
|  |  | ||||||
| @@ -74,5 +81,7 @@ namespace ix | |||||||
|         virtual void handleConnection(int fd, |         virtual void handleConnection(int fd, | ||||||
|                                       std::shared_ptr<ConnectionState> connectionState) = 0; |                                       std::shared_ptr<ConnectionState> connectionState) = 0; | ||||||
|         virtual size_t getConnectedClientsCount() = 0; |         virtual size_t getConnectedClientsCount() = 0; | ||||||
|  |  | ||||||
|  |         void closeTerminatedThreads(); | ||||||
|     }; |     }; | ||||||
| } | } | ||||||
|   | |||||||
| @@ -91,6 +91,7 @@ namespace ix | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         logInfo("WebSocketServer::handleConnection() done"); |         logInfo("WebSocketServer::handleConnection() done"); | ||||||
|  |         connectionState->setTerminated(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients() |     std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user