From d7595b0dd0c1469cf747ca139cd600361d643566 Mon Sep 17 00:00:00 2001 From: Kumamon38 Date: Thu, 18 Apr 2019 18:24:16 +0200 Subject: [PATCH] Real ping (#32) * close method change and fix code * missing mutex * wip * renaming and fixes * renaming, fixes * added enablePong/disablePong, add tests * added new test cases * add 1 test case * fix gcd name to greatestCommonDivisor * move ping and ping timeout checks into socket poll, local var in test cases and indent fixes * indent issue --- ixwebsocket/IXWebSocket.cpp | 59 ++- ixwebsocket/IXWebSocket.h | 22 +- ixwebsocket/IXWebSocketServer.cpp | 19 +- ixwebsocket/IXWebSocketServer.h | 5 + ixwebsocket/IXWebSocketTransport.cpp | 116 ++++- ixwebsocket/IXWebSocketTransport.h | 29 +- test/CMakeLists.txt | 3 +- ...rtBeatTest.cpp => IXWebSocketPingTest.cpp} | 187 ++++++- test/IXWebSocketPingTimeoutTest.cpp | 488 ++++++++++++++++++ 9 files changed, 850 insertions(+), 78 deletions(-) rename test/{IXWebSocketHeartBeatTest.cpp => IXWebSocketPingTest.cpp} (51%) create mode 100644 test/IXWebSocketPingTimeoutTest.cpp diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index b0c846e6..08563d66 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -31,14 +31,18 @@ namespace ix { OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; const int WebSocket::kDefaultHandShakeTimeoutSecs(60); - const int WebSocket::kDefaultHeartBeatPeriod(-1); + const int WebSocket::kDefaultPingIntervalSecs(-1); + const int WebSocket::kDefaultPingTimeoutSecs(-1); + const bool WebSocket::kDefaultEnablePong(true); WebSocket::WebSocket() : _onMessageCallback(OnMessageCallback()), _stop(false), _automaticReconnection(true), _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs), - _heartBeatPeriod(kDefaultHeartBeatPeriod) + _enablePong(kDefaultEnablePong), + _pingIntervalSecs(kDefaultPingIntervalSecs), + _pingTimeoutSecs(kDefaultPingTimeoutSecs) { _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize) @@ -79,18 +83,54 @@ namespace ix return _perMessageDeflateOptions; } - void WebSocket::setHeartBeatPeriod(int heartBeatPeriod) + void WebSocket::setHeartBeatPeriod(int heartBeatPeriodSecs) { std::lock_guard lock(_configMutex); - _heartBeatPeriod = heartBeatPeriod; + _pingIntervalSecs = heartBeatPeriodSecs; } int WebSocket::getHeartBeatPeriod() const { std::lock_guard lock(_configMutex); - return _heartBeatPeriod; + return _pingIntervalSecs; } + void WebSocket::setPingInterval(int pingIntervalSecs) + { + std::lock_guard lock(_configMutex); + _pingIntervalSecs = pingIntervalSecs; + } + + int WebSocket::getPingInterval() const + { + std::lock_guard lock(_configMutex); + return _pingIntervalSecs; + } + + void WebSocket::setPingTimeout(int pingTimeoutSecs) + { + std::lock_guard lock(_configMutex); + _pingTimeoutSecs = pingTimeoutSecs; + } + + int WebSocket::getPingTimeout() const + { + std::lock_guard lock(_configMutex); + return _pingTimeoutSecs; + } + + void WebSocket::enablePong() + { + std::lock_guard lock(_configMutex); + _enablePong = true; + } + + void WebSocket::disablePong() + { + std::lock_guard lock(_configMutex); + _enablePong = false; + } + void WebSocket::start() { if (_thread.joinable()) return; // we've already been started @@ -125,7 +165,9 @@ namespace ix { std::lock_guard lock(_configMutex); _ws.configure(_perMessageDeflateOptions, - _heartBeatPeriod); + _enablePong, + _pingIntervalSecs, + _pingTimeoutSecs); } WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs); @@ -145,7 +187,10 @@ namespace ix { { std::lock_guard lock(_configMutex); - _ws.configure(_perMessageDeflateOptions, _heartBeatPeriod); + _ws.configure(_perMessageDeflateOptions, + _enablePong, + _pingIntervalSecs, + _pingTimeoutSecs); } WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 3e31e339..41c15c5d 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -89,8 +89,12 @@ namespace ix void setUrl(const std::string& url); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setHandshakeTimeout(int handshakeTimeoutSecs); - void setHeartBeatPeriod(int heartBeatPeriod); - + void setHeartBeatPeriod(int heartBeatPeriodSecs); + void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod + void setPingTimeout(int pingTimeoutSecs); + void enablePong(); + void disablePong(); + // Run asynchronously, by calling start and stop. void start(); void stop(); @@ -114,6 +118,8 @@ namespace ix const std::string& getUrl() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; int getHeartBeatPeriod() const; + int getPingInterval() const; + int getPingTimeout() const; size_t bufferedAmount() const; void enableAutomaticReconnection(); @@ -152,9 +158,15 @@ namespace ix std::atomic _handshakeTimeoutSecs; static const int kDefaultHandShakeTimeoutSecs; - // Optional Heartbeat - int _heartBeatPeriod; - static const int kDefaultHeartBeatPeriod; + // enable or disable PONG frame response to received PING frame + bool _enablePong; + static const bool kDefaultEnablePong; + + // Optional ping and ping timeout + int _pingIntervalSecs; + int _pingTimeoutSecs; + static const int kDefaultPingIntervalSecs; + static const int kDefaultPingTimeoutSecs; friend class WebSocketServer; }; diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index c453134d..7582c9ba 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -17,13 +17,15 @@ namespace ix { const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds + const bool WebSocketServer::kDefaultEnablePong(true); WebSocketServer::WebSocketServer(int port, const std::string& host, int backlog, size_t maxConnections, int handshakeTimeoutSecs) : SocketServer(port, host, backlog, maxConnections), - _handshakeTimeoutSecs(handshakeTimeoutSecs) + _handshakeTimeoutSecs(handshakeTimeoutSecs), + _enablePong(kDefaultEnablePong) { } @@ -44,6 +46,16 @@ namespace ix SocketServer::stop(); } + void WebSocketServer::enablePong() + { + _enablePong = true; + } + + void WebSocketServer::disablePong() + { + _enablePong = false; + } + void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback) { _onConnectionCallback = callback; @@ -58,6 +70,11 @@ namespace ix webSocket->disableAutomaticReconnection(); + if (_enablePong) + webSocket->enablePong(); + else + webSocket->disablePong(); + // Add this client to our client set { std::lock_guard lock(_clientsMutex); diff --git a/ixwebsocket/IXWebSocketServer.h b/ixwebsocket/IXWebSocketServer.h index 4964b15f..744d0709 100644 --- a/ixwebsocket/IXWebSocketServer.h +++ b/ixwebsocket/IXWebSocketServer.h @@ -33,6 +33,9 @@ namespace ix virtual ~WebSocketServer(); virtual void stop() final; + void enablePong(); + void disablePong(); + void setOnConnectionCallback(const OnConnectionCallback& callback); // Get all the connected clients @@ -41,6 +44,7 @@ namespace ix private: // Member variables int _handshakeTimeoutSecs; + bool _enablePong; OnConnectionCallback _onConnectionCallback; @@ -48,6 +52,7 @@ namespace ix std::set> _clients; const static int kDefaultHandShakeTimeoutSecs; + const static bool kDefaultEnablePong; // Methods virtual void handleConnection(int fd, diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 852e9e14..67a6d6d8 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -51,10 +51,23 @@ #include +int greatestCommonDivisor (int a, int b) { + while (b != 0) + { + int t = b; + b = a % b; + a = t; + } + + return a; +} + namespace ix { - const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat"); - const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); + const std::string WebSocketTransport::kPingMessage("ixwebsocket::heartbeat"); + const int WebSocketTransport::kDefaultPingIntervalSecs(-1); + const int WebSocketTransport::kDefaultPingTimeoutSecs(-1); + const bool WebSocketTransport::kDefaultEnablePong(true); constexpr size_t WebSocketTransport::kChunkSize; WebSocketTransport::WebSocketTransport() : @@ -64,8 +77,12 @@ namespace ix _closeWireSize(0), _enablePerMessageDeflate(false), _requestInitCancellation(false), - _heartBeatPeriod(kDefaultHeartBeatPeriod), - _lastSendTimePoint(std::chrono::steady_clock::now()) + _enablePong(kDefaultEnablePong), + _pingIntervalSecs(kDefaultPingIntervalSecs), + _pingTimeoutSecs(kDefaultPingTimeoutSecs), + _pingIntervalOrTimeoutGCDSecs(-1), + _lastSendPingTimePoint(std::chrono::steady_clock::now()), + _lastReceivePongTimePoint(std::chrono::steady_clock::now()) { _readbuf.resize(kChunkSize); } @@ -76,11 +93,21 @@ namespace ix } void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, - int heartBeatPeriod) + bool enablePong, + int pingIntervalSecs, int pingTimeoutSecs) { _perMessageDeflateOptions = perMessageDeflateOptions; _enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); - _heartBeatPeriod = heartBeatPeriod; + _enablePong = enablePong; + _pingIntervalSecs = pingIntervalSecs; + _pingTimeoutSecs = pingTimeoutSecs; + + if (pingIntervalSecs > 0 && pingTimeoutSecs > 0) + _pingIntervalOrTimeoutGCDSecs = greatestCommonDivisor(pingIntervalSecs, pingTimeoutSecs); + else if (_pingTimeoutSecs > 0) + _pingIntervalOrTimeoutGCDSecs = pingTimeoutSecs; + else + _pingIntervalOrTimeoutGCDSecs = pingIntervalSecs; } // Client @@ -176,13 +203,25 @@ namespace ix _onCloseCallback = onCloseCallback; } - // Only consider send time points for that computation. - // The receive time points is taken into account in Socket::poll (second parameter). - bool WebSocketTransport::heartBeatPeriodExceeded() + // Only consider send PING time points for that computation. + bool WebSocketTransport::pingIntervalExceeded() { - std::lock_guard lock(_lastSendTimePointMutex); + if (_pingIntervalSecs <= 0) + return false; + + std::lock_guard lock(_lastSendPingTimePointMutex); auto now = std::chrono::steady_clock::now(); - return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod); + return now - _lastSendPingTimePoint > std::chrono::seconds(_pingIntervalSecs); + } + + bool WebSocketTransport::pingTimeoutExceeded() + { + if (_pingTimeoutSecs <= 0) + return false; + + std::lock_guard lock(_lastReceivePongTimePointMutex); + auto now = std::chrono::steady_clock::now(); + return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs); } void WebSocketTransport::poll() @@ -190,19 +229,27 @@ namespace ix _socket->poll( [this](PollResultType pollResult) { - // If (1) heartbeat is enabled, and (2) no data was received or - // send for a duration exceeding our heart-beat period, send a - // ping to the server. - if (pollResult == PollResultType::Timeout && - heartBeatPeriodExceeded()) + if (_readyState == OPEN) { - std::stringstream ss; - ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; - sendPing(ss.str()); + // if (1) ping timeout is enabled and (2) duration since last received ping response (PONG) + // exceeds the maximum delay, then close the connection + if (pingTimeoutExceeded()) + { + close(1011, "Ping timeout"); + } + // If (1) ping is enabled and no ping has been sent for a duration + // exceeding our ping interval, send a ping to the server. + else if (pingIntervalExceeded()) + { + std::stringstream ss; + ss << kPingMessage << "::" << _pingIntervalSecs << "s"; + sendPing(ss.str()); + } } + // Make sure we send all the buffered data // there can be a lot of it for large messages. - else if (pollResult == PollResultType::SendRequest) + if (pollResult == PollResultType::SendRequest) { while (!isSendBufferEmpty() && !_requestInitCancellation) { @@ -264,7 +311,7 @@ namespace ix _socket->close(); } }, - _heartBeatPeriod); + _pingIntervalOrTimeoutGCDSecs); } bool WebSocketTransport::isSendBufferEmpty() const @@ -444,12 +491,16 @@ namespace ix else if (ws.opcode == wsheader_type::PING) { unmaskReceiveBuffer(ws); + std::string pingData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); - // Reply back right away - bool compress = false; - sendData(wsheader_type::PONG, pingData, compress); + if (_enablePong) + { + // Reply back right away + bool compress = false; + sendData(wsheader_type::PONG, pingData, compress); + } emitMessage(PING, pingData, ws, onMessageCallback); } @@ -459,6 +510,9 @@ namespace ix std::string pongData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); + std::lock_guard lck(_lastReceivePongTimePointMutex); + _lastReceivePongTimePoint = std::chrono::steady_clock::now(); + emitMessage(PONG, pongData, ws, onMessageCallback); } else if (ws.opcode == wsheader_type::CLOSE) @@ -724,7 +778,16 @@ namespace ix WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message) { bool compress = false; - return sendData(wsheader_type::PING, message, compress); + + WebSocketSendInfo info = sendData(wsheader_type::PING, message, compress); + + if(info.success) + { + std::lock_guard lck(_lastSendPingTimePointMutex); + _lastSendPingTimePoint = std::chrono::steady_clock::now(); + } + + return info; } WebSocketSendInfo WebSocketTransport::sendBinary( @@ -770,9 +833,6 @@ namespace ix _txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret); } } - - std::lock_guard lck(_lastSendTimePointMutex); - _lastSendTimePoint = std::chrono::steady_clock::now(); } void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize) diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index d8211798..8c2773e6 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -68,7 +68,8 @@ namespace ix ~WebSocketTransport(); void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, - int heartBeatPeriod); + bool enablePong, + int pingIntervalSecs, int pingTimeoutSecs); WebSocketInitResult connectToUrl(const std::string& url, // Client int timeoutSecs); @@ -158,15 +159,25 @@ namespace ix // Used to cancel dns lookup + socket connect + http upgrade std::atomic _requestInitCancellation; - // Optional Heartbeat - int _heartBeatPeriod; - static const int kDefaultHeartBeatPeriod; - const static std::string kHeartBeatPingMessage; - mutable std::mutex _lastSendTimePointMutex; - std::chrono::time_point _lastSendTimePoint; + // enable auto response to ping + bool _enablePong; + static const bool kDefaultEnablePong; - // No data was send through the socket for longer than the heartbeat period - bool heartBeatPeriodExceeded(); + // Optional ping and ping timeout + int _pingIntervalSecs; + int _pingTimeoutSecs; + int _pingIntervalOrTimeoutGCDSecs; // if both ping interval and timeout are set (> 0), then use GCD of these value to wait for the lowest time + static const int kDefaultPingIntervalSecs; + static const int kDefaultPingTimeoutSecs; + const static std::string kPingMessage; + mutable std::mutex _lastSendPingTimePointMutex; + mutable std::mutex _lastReceivePongTimePointMutex; + std::chrono::time_point _lastSendPingTimePoint; + std::chrono::time_point _lastReceivePongTimePoint; + + bool pingIntervalExceeded(); + // No PONG data was received through the socket for longer than ping timeout delay + bool pingTimeoutExceeded(); void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 490ed24e..3a861a2d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -36,7 +36,8 @@ set (SOURCES if (NOT WIN32) list(APPEND SOURCES IXWebSocketServerTest.cpp - IXWebSocketHeartBeatTest.cpp + IXWebSocketPingTest.cpp + IXWebSocketPingTimeoutTest.cpp cmd_websocket_chat.cpp IXWebSocketTestConnectionDisconnection.cpp ) diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketPingTest.cpp similarity index 51% rename from test/IXWebSocketHeartBeatTest.cpp rename to test/IXWebSocketPingTest.cpp index f69af5af..bb1a8285 100644 --- a/test/IXWebSocketHeartBeatTest.cpp +++ b/test/IXWebSocketPingTest.cpp @@ -1,5 +1,5 @@ /* - * IXWebSocketHeartBeatTest.cpp + * IXWebSocketPingTest.cpp * Author: Benjamin Sergeant * Copyright (c) 2019 Machine Zone. All rights reserved. */ @@ -21,7 +21,7 @@ namespace class WebSocketClient { public: - WebSocketClient(int port); + WebSocketClient(int port, bool useHeartBeatMethod); void subscribe(const std::string& channel); void start(); @@ -32,10 +32,12 @@ namespace private: ix::WebSocket _webSocket; int _port; + bool _useHeartBeatMethod; }; - WebSocketClient::WebSocketClient(int port) - : _port(port) + WebSocketClient::WebSocketClient(int port, bool useHeartBeatMethod) + : _port(port), + _useHeartBeatMethod(useHeartBeatMethod) { ; } @@ -65,9 +67,11 @@ namespace _webSocket.setUrl(url); // The important bit for this test. - // Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second - // a ping message will be sent by the client. - _webSocket.setHeartBeatPeriod(1); + // Set a 1 second heartbeat with the setter method to test + if (_useHeartBeatMethod) + _webSocket.setHeartBeatPeriod(1); + else + _webSocket.setPingInterval(1); std::stringstream ss; log(std::string("Connecting to url: ") + url); @@ -176,9 +180,9 @@ namespace } } -TEST_CASE("Websocket_heartbeat", "[heartbeat]") +TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]") { - SECTION("Make sure that ping messages are sent during heartbeat.") + SECTION("Make sure that ping messages are sent when no other data are sent.") { ix::setupWebSocketTrafficTrackerCallback(); @@ -188,36 +192,165 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") REQUIRE(startServer(server, serverReceivedPingMessages)); std::string session = ix::generateSessionId(); - WebSocketClient webSocketClientA(port); - WebSocketClient webSocketClientB(port); + bool useSetHeartBeatPeriodMethod = true; + WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod); - webSocketClientA.start(); - webSocketClientB.start(); + webSocketClient.start(); // Wait for all chat instance to be ready while (true) { - if (webSocketClientA.isReady() && webSocketClientB.isReady()) break; + if (webSocketClient.isReady()) break; ix::msleep(10); } - REQUIRE(server.getClients().size() == 2); + REQUIRE(server.getClients().size() == 1); - ix::msleep(900); - webSocketClientB.sendMessage("hello world"); - ix::msleep(900); - webSocketClientB.sendMessage("hello world"); - ix::msleep(900); + ix::msleep(1900); - webSocketClientA.stop(); - webSocketClientB.stop(); + webSocketClient.stop(); - // Here we test heart beat period exceeded for clientA - // but it should not be exceeded for clientB which has sent data. - // -> expected ping messages == 2, but add a small buffer to make this more reliable. - REQUIRE(serverReceivedPingMessages >= 2); - REQUIRE(serverReceivedPingMessages <= 4); + // Here we test ping interval + // -> expected ping messages == 1 as 1900 seconds, 1 ping sent every second + REQUIRE(serverReceivedPingMessages == 1); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]") +{ + SECTION("Make sure that ping messages are sent, even if other messages are sent") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + REQUIRE(startServer(server, serverReceivedPingMessages)); + + std::string session = ix::generateSessionId(); + bool useSetHeartBeatPeriodMethod = true; + WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(900); + webSocketClient.sendMessage("hello world"); + ix::msleep(900); + webSocketClient.sendMessage("hello world"); + ix::msleep(1100); + + webSocketClient.stop(); + + // Here we test ping interval + // client has sent data, but ping should have been sent no matter what + // -> expected ping messages == 2 as 900+900+1100 = 2900 seconds, 1 ping sent every second + REQUIRE(serverReceivedPingMessages == 2); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_no_data_sent_setPingInterval", "[setPingInterval]") +{ + SECTION("Make sure that ping messages are sent when no other data are sent.") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + REQUIRE(startServer(server, serverReceivedPingMessages)); + + std::string session = ix::generateSessionId(); + bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval + WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(2100); + + webSocketClient.stop(); + + + // Here we test ping interval + // -> expected ping messages == 2 as 2100 seconds, 1 ping sent every second + REQUIRE(serverReceivedPingMessages == 2); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]") +{ + SECTION("Make sure that ping messages are sent, even if other messages are sent") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + REQUIRE(startServer(server, serverReceivedPingMessages)); + + std::string session = ix::generateSessionId(); + bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval + WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(900); + webSocketClient.sendMessage("hello world"); + ix::msleep(900); + webSocketClient.sendMessage("hello world"); + ix::msleep(1300); + + webSocketClient.stop(); + + // Here we test ping interval + // client has sent data, but ping should have been sent no matter what + // -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second + REQUIRE(serverReceivedPingMessages == 3); // Give us 500ms for the server to notice that clients went away ix::msleep(500); diff --git a/test/IXWebSocketPingTimeoutTest.cpp b/test/IXWebSocketPingTimeoutTest.cpp new file mode 100644 index 00000000..74b523ff --- /dev/null +++ b/test/IXWebSocketPingTimeoutTest.cpp @@ -0,0 +1,488 @@ +/* + * IXWebSocketHeartBeatNoResponseAutoDisconnectTest.cpp + * Author: Alexandre Konieczny + * Copyright (c) 2019 Machine Zone. All rights reserved. + */ + +#include +#include +#include +#include +#include + +#include "IXTest.h" + +#include "catch.hpp" + +using namespace ix; + +namespace +{ + class WebSocketClient + { + public: + WebSocketClient(int port, int pingInterval, int pingTimeout); + + void subscribe(const std::string& channel); + void start(); + void stop(); + bool isReady() const; + bool isClosed() const; + void sendMessage(const std::string& text); + int getReceivedPongMessages(); + bool closedDueToPingTimeout(); + + private: + ix::WebSocket _webSocket; + int _port; + int _pingInterval; + int _pingTimeout; + std::atomic _receivedPongMessages; + std::atomic _closedDueToPingTimeout; + }; + + WebSocketClient::WebSocketClient(int port, int pingInterval, int pingTimeout) + : _port(port), + _receivedPongMessages(0), + _closedDueToPingTimeout(false), + _pingInterval(pingInterval), + _pingTimeout(pingTimeout) + { + ; + } + + bool WebSocketClient::isReady() const + { + return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open; + } + + bool WebSocketClient::isClosed() const + { + return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Closed; + } + + void WebSocketClient::stop() + { + _webSocket.stop(); + } + + void WebSocketClient::start() + { + std::string url; + { + std::stringstream ss; + ss << "ws://localhost:" + << _port + << "/"; + + url = ss.str(); + } + + _webSocket.setUrl(url); + + // The important bit for this test. + // Set a ping interval, and a ping timeout + _webSocket.setPingInterval(_pingInterval); + _webSocket.setPingTimeout(_pingTimeout); + + std::stringstream ss; + log(std::string("Connecting to url: ") + url); + + _webSocket.setOnMessageCallback( + [this](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) + { + std::stringstream ss; + if (messageType == ix::WebSocket_MessageType_Open) + { + log("client connected"); + + _webSocket.disableAutomaticReconnection(); + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + log("client disconnected"); + + if (closeInfo.code == 1011) + { + _closedDueToPingTimeout = true; + } + + _webSocket.disableAutomaticReconnection(); + + } + else if (messageType == ix::WebSocket_MessageType_Error) + { + ss << "Error ! " << error.reason; + log(ss.str()); + + _webSocket.disableAutomaticReconnection(); + } + else if (messageType == ix::WebSocket_MessageType_Pong) + { + _receivedPongMessages++; + + ss << "Received pong message " << str; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Ping) + { + ss << "Received ping message " << str; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Message) + { + ss << "Received message " << str; + log(ss.str()); + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + void WebSocketClient::sendMessage(const std::string& text) + { + _webSocket.send(text); + } + + int WebSocketClient::getReceivedPongMessages() + { + return _receivedPongMessages; + } + + bool WebSocketClient::closedDueToPingTimeout() + { + return _closedDueToPingTimeout; + } + + bool startServer(ix::WebSocketServer& server, std::atomic& receivedPingMessages, bool enablePong) + { + // A dev/null server + server.setOnConnectionCallback( + [&server, &receivedPingMessages](std::shared_ptr webSocket, + std::shared_ptr connectionState) + { + webSocket->setOnMessageCallback( + [webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) + { + if (messageType == ix::WebSocket_MessageType_Open) + { + Logger() << "New server connection"; + Logger() << "id: " << connectionState->getId(); + Logger() << "Uri: " << openInfo.uri; + Logger() << "Headers:"; + for (auto it : openInfo.headers) + { + Logger() << it.first << ": " << it.second; + } + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + log("Server closed connection"); + } + else if (messageType == ix::WebSocket_MessageType_Ping) + { + log("Server received a ping"); + receivedPingMessages++; + } + } + ); + } + ); + + if (!enablePong) + { + // USE this to prevent a pong answer, so the ping timeout is raised on client + server.disablePong(); + } + + auto res = server.listen(); + if (!res.first) + { + log(res.second); + return false; + } + + server.start(); + return true; + } +} + +TEST_CASE("Websocket_ping_timeout_not_checked", "[setPingTimeout]") +{ + SECTION("Make sure that ping messages have a response (PONG).") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + bool enablePong = false; // Pong is disabled on Server + REQUIRE(startServer(server, serverReceivedPingMessages, enablePong)); + + std::string session = ix::generateSessionId(); + int pingIntervalSecs = 1; + int pingTimeoutSecs = -1; // ping timeout not checked + WebSocketClient webSocketClient(port, pingIntervalSecs, pingTimeoutSecs); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(1100); + + // Here we test ping timeout, no timeout + REQUIRE(serverReceivedPingMessages == 1); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + + ix::msleep(1000); + + // Here we test ping timeout, no timeout + REQUIRE(serverReceivedPingMessages == 2); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + + webSocketClient.stop(); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + // Ensure client close was not by ping timeout + REQUIRE(webSocketClient.closedDueToPingTimeout() == false); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_no_timeout", "[setPingTimeout]") +{ + SECTION("Make sure that ping messages have a response (PONG).") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + bool enablePong = true; // Pong is enabled on Server + REQUIRE(startServer(server, serverReceivedPingMessages, enablePong)); + + std::string session = ix::generateSessionId(); + int pingIntervalSecs = 1; + int pingTimeoutSecs = 2; + WebSocketClient webSocketClient(port, pingIntervalSecs, pingTimeoutSecs); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(1100); + + // Here we test ping timeout, no timeout + REQUIRE(serverReceivedPingMessages == 1); + REQUIRE(webSocketClient.getReceivedPongMessages() == 1); + + ix::msleep(1000); + + // Here we test ping timeout, no timeout + REQUIRE(serverReceivedPingMessages == 2); + REQUIRE(webSocketClient.getReceivedPongMessages() == 2); + + webSocketClient.stop(); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + // Ensure client close was not by ping timeout + REQUIRE(webSocketClient.closedDueToPingTimeout() == false); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_no_ping_but_timeout", "[setPingTimeout]") +{ + SECTION("Make sure that ping messages don't have responses (no PONG).") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + bool enablePong = false; // Pong is disabled on Server + REQUIRE(startServer(server, serverReceivedPingMessages, enablePong)); + + std::string session = ix::generateSessionId(); + int pingIntervalSecs = -1; // no ping set + int pingTimeoutSecs = 3; + WebSocketClient webSocketClient(port, pingIntervalSecs, pingTimeoutSecs); + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(2900); + + // Here we test ping timeout, no timeout yet + REQUIRE(serverReceivedPingMessages == 0); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + + REQUIRE(webSocketClient.isClosed() == false); + REQUIRE(webSocketClient.closedDueToPingTimeout() == false); + + ix::msleep(200); + + // Here we test ping timeout, timeout + REQUIRE(serverReceivedPingMessages == 0); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + // Ensure client close was not by ping timeout + REQUIRE(webSocketClient.isClosed() == true); + REQUIRE(webSocketClient.closedDueToPingTimeout() == true); + + webSocketClient.stop(); + + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_timeout", "[setPingTimeout]") +{ + SECTION("Make sure that ping messages don't have responses (no PONG).") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + bool enablePong = false; // Pong is disabled on Server + REQUIRE(startServer(server, serverReceivedPingMessages, enablePong)); + + std::string session = ix::generateSessionId(); + int pingIntervalSecs = 1; + int pingTimeoutSecs = 2; + WebSocketClient webSocketClient(port, pingIntervalSecs, pingTimeoutSecs); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(1100); + + // Here we test ping timeout, no timeout yet + REQUIRE(serverReceivedPingMessages == 1); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + + ix::msleep(1000); + + // Here we test ping timeout, timeout + REQUIRE(serverReceivedPingMessages == 1); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + // Ensure client close was not by ping timeout + REQUIRE(webSocketClient.isClosed() == true); + REQUIRE(webSocketClient.closedDueToPingTimeout() == true); + + webSocketClient.stop(); + + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} + +TEST_CASE("Websocket_ping_long_timeout", "[setPingTimeout]") +{ + SECTION("Make sure that ping messages don't have responses (no PONG).") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = getFreePort(); + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + bool enablePong = false; // Pong is disabled on Server + REQUIRE(startServer(server, serverReceivedPingMessages, enablePong)); + + std::string session = ix::generateSessionId(); + int pingIntervalSecs = 2; + int pingTimeoutSecs = 6; + WebSocketClient webSocketClient(port, pingIntervalSecs, pingTimeoutSecs); + + webSocketClient.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClient.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 1); + + ix::msleep(5900); + + // Here we test ping timeout, no timeout yet (2 ping sent at 2s and 4s) + REQUIRE(serverReceivedPingMessages == 2); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + + // Ensure client not closed + REQUIRE(webSocketClient.isClosed() == false); + REQUIRE(webSocketClient.closedDueToPingTimeout() == false); + + ix::msleep(200); + + // Here we test ping timeout, timeout (at 6 seconds) + REQUIRE(serverReceivedPingMessages == 2); + REQUIRE(webSocketClient.getReceivedPongMessages() == 0); + // Ensure client close was not by ping timeout + REQUIRE(webSocketClient.isClosed() == true); + REQUIRE(webSocketClient.closedDueToPingTimeout() == true); + + webSocketClient.stop(); + + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +}