From 885d245afb665896d4da02c80b92afdbb067a27e Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 25 Jan 2019 16:11:39 -0800 Subject: [PATCH] heartbeat correct --- README.md | 9 +++++++ ixwebsocket/IXWebSocketTransport.cpp | 19 ++++++++++++-- ixwebsocket/IXWebSocketTransport.h | 5 ++++ makefile | 2 ++ test/IXWebSocketHeartBeatTest.cpp | 39 ++++++++++++++++------------ 5 files changed, 56 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 811f61ca..6d3405f4 100644 --- a/README.md +++ b/README.md @@ -309,4 +309,13 @@ A ping message can be sent to the server, with an optional data string. ``` websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long"); + +### Heartbeat. + +You can configure an optional heart beat / keep-alive, sent every 45 seconds +when there isn't any traffic to make sure that load balancers do not kill an +idle connection. + +``` +webSocket.setHeartBeatPeriod(45); ``` diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 7ae28b8c..15b155f8 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -42,7 +42,8 @@ namespace ix _closeWireSize(0), _enablePerMessageDeflate(false), _requestInitCancellation(false), - _heartBeatPeriod(kDefaultHeartBeatPeriod) + _heartBeatPeriod(kDefaultHeartBeatPeriod), + _lastSendTimePoint(std::chrono::steady_clock::now()) { } @@ -155,12 +156,23 @@ namespace ix _onCloseCallback = onCloseCallback; } + bool WebSocketTransport::exceedSendHeartBeatTimeOut() + { + std::lock_guard lock(_lastSendTimePointMutex); + auto now = std::chrono::steady_clock::now(); + return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod); + } + void WebSocketTransport::poll() { _socket->poll( [this](PollResultType pollResult) { - if (pollResult == PollResultType_Timeout) + // If (1) heartbeat is enabled, and (2) no data was received or + // send for a duration exceeding our heart-beat period, send a + // ping to the server. + if (pollResult == PollResultType_Timeout && + exceedSendHeartBeatTimeOut()) { std::stringstream ss; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; @@ -572,6 +584,9 @@ namespace ix _txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret); } } + + std::lock_guard lck(_lastSendTimePointMutex); + _lastSendTimePoint = std::chrono::steady_clock::now(); } void WebSocketTransport::close() diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index a9d53987..90585c78 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -122,6 +122,11 @@ namespace ix int _heartBeatPeriod; static const int kDefaultHeartBeatPeriod; const static std::string kHeartBeatPingMessage; + mutable std::mutex _lastSendTimePointMutex; + std::chrono::time_point _lastSendTimePoint; + + // No data was send through the socket for longer that the hearbeat period + bool exceedSendHeartBeatTimeOut(); void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, diff --git a/makefile b/makefile index 5fe7df6c..04e384f8 100644 --- a/makefile +++ b/makefile @@ -24,6 +24,8 @@ test_server: (cd test && npm i ws && node broadcast-server.js) # env TEST=Websocket_server make test +# env TEST=websocket_server make test +# env TEST=heartbeat make test test: python test/run.py diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketHeartBeatTest.cpp index 87bf0012..cc1d5510 100644 --- a/test/IXWebSocketHeartBeatTest.cpp +++ b/test/IXWebSocketHeartBeatTest.cpp @@ -27,6 +27,7 @@ namespace void start(); void stop(); bool isReady() const; + void sendMessage(const std::string& text); private: ix::WebSocket _webSocket; @@ -103,6 +104,11 @@ namespace ss << "Received ping message " << str; log(ss.str()); } + else if (messageType == ix::WebSocket_MessageType_Message) + { + ss << "Received message " << str; + log(ss.str()); + } else { ss << "Invalid ix::WebSocketMessageType"; @@ -113,8 +119,14 @@ namespace _webSocket.start(); } + void WebSocketClient::sendMessage(const std::string& text) + { + _webSocket.send(text); + } + bool startServer(ix::WebSocketServer& server, std::atomic& receivedPingMessages) { + // A dev/null server server.setOnConnectionCallback( [&server, &receivedPingMessages](std::shared_ptr webSocket) { @@ -128,7 +140,7 @@ namespace { if (messageType == ix::WebSocket_MessageType_Open) { - Logger() << "New connection"; + Logger() << "New server connection"; Logger() << "Uri: " << openInfo.uri; Logger() << "Headers:"; for (auto it : openInfo.headers) @@ -138,23 +150,13 @@ namespace } else if (messageType == ix::WebSocket_MessageType_Close) { - log("Closed connection"); + log("Server closed connection"); } else if (messageType == ix::WebSocket_MessageType_Ping) { - log("Received a ping"); + log("Server received a ping"); receivedPingMessages++; } - else if (messageType == ix::WebSocket_MessageType_Message) - { - for (auto&& client : server.getClients()) - { - if (client != webSocket) - { - client->send(str); - } - } - } } ); } @@ -178,7 +180,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") { ix::setupWebSocketTrafficTrackerCallback(); - int port = 8092; + int port = 8093; ix::WebSocketServer server(port); std::atomic serverReceivedPingMessages(0); REQUIRE(startServer(server, serverReceivedPingMessages)); @@ -199,12 +201,17 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") REQUIRE(server.getClients().size() == 2); - ix::msleep(3000); + ix::msleep(900); + webSocketClientB.sendMessage("hello world"); + ix::msleep(900); + webSocketClientB.sendMessage("hello world"); + ix::msleep(900); webSocketClientA.stop(); webSocketClientB.stop(); - REQUIRE(serverReceivedPingMessages >= 4); + REQUIRE(serverReceivedPingMessages >= 2); + REQUIRE(serverReceivedPingMessages <= 4); // Give us 500ms for the server to notice that clients went away ix::msleep(500);