diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 00c322cd..12d9e98c 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -84,13 +84,16 @@ namespace ix void WebSocket::stop() { + bool automaticReconnection = _automaticReconnection; + + // This value needs to be forced when shutting down, it is restored later _automaticReconnection = false; close(); if (!_thread.joinable()) { - _automaticReconnection = true; + _automaticReconnection = automaticReconnection; return; } @@ -98,7 +101,7 @@ namespace ix _thread.join(); _stop = false; - _automaticReconnection = true; + _automaticReconnection = automaticReconnection; } WebSocketInitResult WebSocket::connect() @@ -241,6 +244,11 @@ namespace ix WebSocket::invokeTrafficTrackerCallback(msg.size(), true); }); + + // 4. In blocking mode, getting out of this function is triggered by + // an explicit disconnection from the callback, or by the remote end + // closing the connection, ie isConnected() == false. + if (!_thread.joinable() && !isConnected() && !_automaticReconnection) return; } } @@ -332,4 +340,14 @@ namespace ix case WebSocket_ReadyState_Closed: return "CLOSED"; } } + + void WebSocket::enableAutomaticReconnection() + { + _automaticReconnection = true; + } + + void WebSocket::disableAutomaticReconnection() + { + _automaticReconnection = false; + } } diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 4b1cbd84..12b616f5 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -72,8 +72,14 @@ namespace ix void setUrl(const std::string& url); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); + // Run asynchronously, by calling start and stop. void start(); void stop(); + + // Run in blocking mode, by connecting first manually, and then calling run. + WebSocketInitResult connect(); + void run(); + WebSocketSendInfo send(const std::string& text); WebSocketSendInfo ping(const std::string& text); void close(); @@ -86,12 +92,13 @@ namespace ix const std::string& getUrl() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; + void enableAutomaticReconnection(); + void disableAutomaticReconnection(); + private: - void run(); WebSocketSendInfo sendMessage(const std::string& text, bool ping); - WebSocketInitResult connect(); bool isConnected() const; bool isClosing() const; void reconnectPerpetuallyIfDisconnected(); diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index a0e35679..e822a581 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -130,11 +130,16 @@ namespace ix _conditionVariable.wait(lock); } - // FIXME: we should cancel all the async per connections tasks void WebSocketServer::stop() { if (!_thread.joinable()) return; // nothing to do + auto clients = getClients(); + for (auto client : clients) + { + client->close(); + } + _stop = true; _thread.join(); _stop = false; @@ -187,20 +192,19 @@ namespace ix } } - // - // FIXME: make sure we never run into reconnectPerpetuallyIfDisconnected - // void WebSocketServer::handleConnection(int fd) { std::shared_ptr webSocket(new WebSocket); _onConnectionCallback(webSocket); + webSocket->disableAutomaticReconnection(); + + // Add this client to our client set { std::lock_guard lock(_clientsMutex); _clients.insert(webSocket); } - webSocket->start(); auto status = webSocket->connectToSocket(fd); if (!status.success) { @@ -211,16 +215,17 @@ namespace ix return; } - // We can do better than this busy loop, with a condition variable. - while (webSocket->isConnected()) - { - std::chrono::duration wait(10); - std::this_thread::sleep_for(wait); - } + // Process incoming messages and execute callbacks + // until the connection is closed + webSocket->run(); + // Remove this client from our client set { std::lock_guard lock(_clientsMutex); - _clients.erase(webSocket); + if (_clients.erase(webSocket) != 1) + { + logError("Cannot delete client"); + } } logInfo("WebSocketServer::handleConnection() done"); diff --git a/test/cmd_websocket_chat.cpp b/test/cmd_websocket_chat.cpp index da2db288..2e90c4a7 100644 --- a/test/cmd_websocket_chat.cpp +++ b/test/cmd_websocket_chat.cpp @@ -250,6 +250,8 @@ TEST_CASE("Websocket chat", "[websocket_chat]") ix::msleep(10); } + REQUIRE(server.getClients().size() == 2); + // Add a bit of extra time, for the subscription to be active ix::msleep(200); @@ -269,6 +271,10 @@ TEST_CASE("Websocket chat", "[websocket_chat]") REQUIRE(chatA.getReceivedMessagesCount() == 2); REQUIRE(chatB.getReceivedMessagesCount() == 3); + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + ix::reportWebSocketTraffic(); } } diff --git a/test/run.sh b/test/run.sh index ef727cc4..f9ee444c 100644 --- a/test/run.sh +++ b/test/run.sh @@ -2,6 +2,6 @@ mkdir build cd build -cmake .. -make +cmake .. || exit 1 +make || exit 1 ./ixwebsocket_unittest