From 05033714bf5fd82a7c3bfca90984b31cc1622202 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 24 Jan 2019 12:42:49 -0800 Subject: [PATCH 1/5] hearbeat --- ixwebsocket/IXSocket.cpp | 30 +++++++++++++++++++--------- ixwebsocket/IXSocket.h | 16 +++++++++++++-- ixwebsocket/IXWebSocket.cpp | 21 ++++++++++++++++--- ixwebsocket/IXWebSocket.h | 8 +++++++- ixwebsocket/IXWebSocketTransport.cpp | 21 +++++++++++++++---- ixwebsocket/IXWebSocketTransport.h | 8 +++++++- test/run.py | 2 +- 7 files changed, 85 insertions(+), 21 deletions(-) diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index cfedb3b0..d456d3d2 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -21,6 +21,9 @@ namespace ix { + const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default + const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; + Socket::Socket(int fd) : _sockfd(fd) { @@ -32,14 +35,8 @@ namespace ix close(); } - void Socket::poll(const OnPollCallback& onPollCallback) + void Socket::poll(const OnPollCallback& onPollCallback, int timeoutSecs) { - if (_sockfd == -1) - { - onPollCallback(); - return; - } - fd_set rfds; FD_ZERO(&rfds); FD_SET(_sockfd, &rfds); @@ -48,11 +45,26 @@ namespace ix FD_SET(_eventfd.getFd(), &rfds); #endif + struct timeval timeout; + timeout.tv_sec = timeoutSecs; + timeout.tv_usec = 0; + int sockfd = _sockfd; int nfds = (std::max)(sockfd, _eventfd.getFd()); - select(nfds + 1, &rfds, nullptr, nullptr, nullptr); + int ret = select(nfds + 1, &rfds, nullptr, nullptr, + (timeoutSecs == kDefaultPollNoTimeout) ? nullptr : &timeout); - onPollCallback(); + PollResultType pollResult = PollResultType_ReadyForRead; + if (ret < 0) + { + pollResult = PollResultType_Error; + } + else if (ret == 0) + { + pollResult = PollResultType_Timeout; + } + + onPollCallback(pollResult); } void Socket::wakeUpFromPoll() diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 5004d66b..fc72a291 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -21,16 +21,24 @@ typedef SSIZE_T ssize_t; namespace ix { + enum PollResultType + { + PollResultType_ReadyForRead = 0, + PollResultType_Timeout = 1, + PollResultType_Error = 2 + }; + class Socket { public: - using OnPollCallback = std::function; + using OnPollCallback = std::function; Socket(int fd = -1); virtual ~Socket(); void configure(); - virtual void poll(const OnPollCallback& onPollCallback); + virtual void poll(const OnPollCallback& onPollCallback, + int timeoutSecs = kDefaultPollTimeout); virtual void wakeUpFromPoll(); // Virtual methods @@ -62,5 +70,9 @@ namespace ix std::atomic _sockfd; std::mutex _socketMutex; EventFd _eventfd; + + private: + static const int kDefaultPollTimeout; + static const int kDefaultPollNoTimeout; }; } diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index b51ed980..d3463cf3 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -31,12 +31,14 @@ namespace ix { OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; const int WebSocket::kDefaultHandShakeTimeoutSecs(60); + const int WebSocket::kDefaultHeartBeatPeriod(-1); WebSocket::WebSocket() : _onMessageCallback(OnMessageCallback()), _stop(false), _automaticReconnection(true), - _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs) + _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs), + _heartBeatPeriod(kDefaultHeartBeatPeriod) { _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize) @@ -77,6 +79,18 @@ namespace ix return _perMessageDeflateOptions; } + void WebSocket::setHeartBeatPeriod(int hearBeatPeriod) + { + std::lock_guard lock(_configMutex); + _heartBeatPeriod = hearBeatPeriod; + } + + int WebSocket::getHeartBeatPeriod() const + { + std::lock_guard lock(_configMutex); + return _heartBeatPeriod; + } + void WebSocket::start() { if (_thread.joinable()) return; // we've already been started @@ -110,7 +124,8 @@ namespace ix { { std::lock_guard lock(_configMutex); - _ws.configure(_perMessageDeflateOptions); + _ws.configure(_perMessageDeflateOptions, + _heartBeatPeriod); } WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs); @@ -130,7 +145,7 @@ namespace ix { { std::lock_guard lock(_configMutex); - _ws.configure(_perMessageDeflateOptions); + _ws.configure(_perMessageDeflateOptions, _heartBeatPeriod); } WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index ef75d8df..081b0cc4 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -86,7 +86,8 @@ namespace ix void setUrl(const std::string& url); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); - void setHandshakeTimeout(int _handshakeTimeoutSecs); + void setHandshakeTimeout(int handshakeTimeoutSecs); + void setHeartBeatPeriod(int hearBeatPeriod); // Run asynchronously, by calling start and stop. void start(); @@ -107,6 +108,7 @@ namespace ix ReadyState getReadyState() const; const std::string& getUrl() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; + int getHeartBeatPeriod() const; void enableAutomaticReconnection(); void disableAutomaticReconnection(); @@ -142,6 +144,10 @@ namespace ix std::atomic _handshakeTimeoutSecs; static const int kDefaultHandShakeTimeoutSecs; + // Optional Heartbeat + int _heartBeatPeriod; + static const int kDefaultHeartBeatPeriod; + friend class WebSocketServer; }; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 4c92a901..e3c5b287 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -33,12 +33,16 @@ namespace ix { + const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat"); + const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); + WebSocketTransport::WebSocketTransport() : _readyState(CLOSED), _closeCode(0), _closeWireSize(0), _enablePerMessageDeflate(false), - _requestInitCancellation(false) + _requestInitCancellation(false), + _heartBeatPeriod(kDefaultHeartBeatPeriod) { } @@ -48,10 +52,12 @@ namespace ix ; } - void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) + void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, + int hearBeatPeriod) { _perMessageDeflateOptions = perMessageDeflateOptions; _enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); + _heartBeatPeriod = hearBeatPeriod; } // Client @@ -152,8 +158,14 @@ namespace ix void WebSocketTransport::poll() { _socket->poll( - [this]() + [this](PollResultType pollResult) { + if (pollResult == PollResultType_Timeout) + { + sendPing(kHeartBeatPingMessage); + return; + } + while (true) { int N = (int) _rxbuf.size(); @@ -185,7 +197,8 @@ namespace ix _socket->close(); setReadyState(CLOSED); } - }); + }, + _heartBeatPeriod); } bool WebSocketTransport::isSendBufferEmpty() const diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index a2289e83..a9d53987 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -57,7 +57,8 @@ namespace ix WebSocketTransport(); ~WebSocketTransport(); - void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); + void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, + int hearBeatPeriod); WebSocketInitResult connectToUrl(const std::string& url, // Client int timeoutSecs); @@ -116,6 +117,11 @@ namespace ix // Used to cancel dns lookup + socket connect + http upgrade std::atomic _requestInitCancellation; + + // Optional Heartbeat + int _heartBeatPeriod; + static const int kDefaultHeartBeatPeriod; + const static std::string kHeartBeatPingMessage; void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, diff --git a/test/run.py b/test/run.py index 2f78c61d..3cee95bd 100644 --- a/test/run.py +++ b/test/run.py @@ -19,7 +19,7 @@ if osName == 'Windows': testBinary ='ixwebsocket_unittest.exe' else: generator = '' - make = 'make' + make = 'make -j6' testBinary ='./ixwebsocket_unittest' sanitizersFlags = { From 5d4e897cc4ee753a6c30ab3bc16017a90f7aba87 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 24 Jan 2019 18:50:07 -0800 Subject: [PATCH 2/5] add an heartbeat test --- test/CMakeLists.txt | 1 + test/IXWebSocketHeartBeatTest.cpp | 215 ++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) create mode 100644 test/IXWebSocketHeartBeatTest.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c763530e..3a133b4c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -34,6 +34,7 @@ set (SOURCES if (NOT WIN32) list(APPEND SOURCES IXWebSocketServerTest.cpp + IXWebSocketHeartBeatTest.cpp cmd_websocket_chat.cpp IXWebSocketTestConnectionDisconnection.cpp ) diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketHeartBeatTest.cpp new file mode 100644 index 00000000..5e03d930 --- /dev/null +++ b/test/IXWebSocketHeartBeatTest.cpp @@ -0,0 +1,215 @@ +/* + * IXWebSocketHeartBeatTest.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone. All rights reserved. + */ + +#include +#include +#include +#include +#include + +#include "IXTest.h" + +#include "catch.hpp" + +using namespace ix; + +namespace +{ + class WebSocketClient + { + public: + WebSocketClient(int port); + + void subscribe(const std::string& channel); + void start(); + void stop(); + bool isReady() const; + + private: + ix::WebSocket _webSocket; + int _port; + }; + + WebSocketClient::WebSocketClient(int port) + : _port(port) + { + ; + } + + bool WebSocketClient::isReady() const + { + return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open; + } + + void WebSocketClient::stop() + { + _webSocket.stop(); + } + + void WebSocketClient::start() + { + std::string url; + { + std::stringstream ss; + ss << "ws://localhost:" + << _port + << "/"; + + url = ss.str(); + } + + _webSocket.setUrl(url); + + // The important bit for this test. + // Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second + // a ping message will be sent by the client. + _webSocket.setHeartBeatPeriod(1); + + std::stringstream ss; + log(std::string("Connecting to url: ") + url); + + _webSocket.setOnMessageCallback( + [this](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) + { + std::stringstream ss; + if (messageType == ix::WebSocket_MessageType_Open) + { + log("client connected"); + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + log("client disconnected"); + } + else if (messageType == ix::WebSocket_MessageType_Error) + { + ss << "Error ! " << error.reason; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Pong) + { + ss << "Received pong message " << str; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Ping) + { + ss << "Received ping message " << str; + log(ss.str()); + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + bool startServer(ix::WebSocketServer& server, std::atomic& receivedPingMessages) + { + server.setOnConnectionCallback( + [&server, &receivedPingMessages](std::shared_ptr webSocket) + { + webSocket->setOnMessageCallback( + [webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) + { + if (messageType == ix::WebSocket_MessageType_Open) + { + Logger() << "New connection"; + Logger() << "Uri: " << openInfo.uri; + Logger() << "Headers:"; + for (auto it : openInfo.headers) + { + Logger() << it.first << ": " << it.second; + } + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + log("Closed connection"); + } + else if (messageType == ix::WebSocket_MessageType_Ping) + { + log("Received a ping"); + receivedPingMessages++; + } + else if (messageType == ix::WebSocket_MessageType_Message) + { + for (auto&& client : server.getClients()) + { + if (client != webSocket) + { + client->send(str); + } + } + } + } + ); + } + ); + + auto res = server.listen(); + if (!res.first) + { + log(res.second); + return false; + } + + server.start(); + return true; + } +} + +TEST_CASE("Websocket_heartbeat", "[heartbeat]") +{ + SECTION("Make sure that ping messages are sent during heartbeat.") + { + ix::setupWebSocketTrafficTrackerCallback(); + + int port = 8091; + ix::WebSocketServer server(port); + std::atomic serverReceivedPingMessages(0); + REQUIRE(startServer(server, serverReceivedPingMessages)); + + std::string session = ix::generateSessionId(); + WebSocketClient webSocketClientA(port); + WebSocketClient webSocketClientB(port); + + webSocketClientA.start(); + webSocketClientB.start(); + + // Wait for all chat instance to be ready + while (true) + { + if (webSocketClientA.isReady() && webSocketClientB.isReady()) break; + ix::msleep(10); + } + + REQUIRE(server.getClients().size() == 2); + + ix::msleep(3000); + + webSocketClientA.stop(); + webSocketClientB.stop(); + + REQUIRE(serverReceivedPingMessages >= 4); + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + REQUIRE(server.getClients().size() == 0); + + ix::reportWebSocketTraffic(); + } +} From c665d65cba1e0e6bf3f4d6820e38741453440ce5 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 24 Jan 2019 19:54:10 -0800 Subject: [PATCH 3/5] unittest fix --- README.md | 6 +++++- test/IXWebSocketHeartBeatTest.cpp | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ae24d970..811f61ca 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,11 @@ Here is what the client API looks like. ix::WebSocket webSocket; std::string url("ws://localhost:8080/"); -webSocket.configure(url); +webSocket.setUrl(url); + +// Optional heart beat, sent every 45 seconds when there isn't any traffic +// to make sure that load balancers do not kill an idle connection. +webSocket.setHeartBeatPeriod(45); // Setup a callback to be fired when a message or an event (open, close, error) is received webSocket.setOnMessageCallback( diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketHeartBeatTest.cpp index 5e03d930..52f37175 100644 --- a/test/IXWebSocketHeartBeatTest.cpp +++ b/test/IXWebSocketHeartBeatTest.cpp @@ -178,7 +178,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") { ix::setupWebSocketTrafficTrackerCallback(); - int port = 8091; + int port = 8092; ix::WebSocketServer server(port); std::atomic serverReceivedPingMessages(0); REQUIRE(startServer(server, serverReceivedPingMessages)); From 3c9ec0aed0477b4532077d64cef32f36f3bb37a6 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 24 Jan 2019 21:16:32 -0800 Subject: [PATCH 4/5] close server socket on exit --- ixwebsocket/IXSocketServer.cpp | 27 ++++++++++++------- ixwebsocket/IXWebSocketTransport.cpp | 4 ++- test/IXWebSocketHeartBeatTest.cpp | 10 +++---- ...IXWebSocketTestConnectionDisconnection.cpp | 12 ++++----- test/cmd_websocket_chat.cpp | 5 ---- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/ixwebsocket/IXSocketServer.cpp b/ixwebsocket/IXSocketServer.cpp index a6344e27..56012e88 100644 --- a/ixwebsocket/IXSocketServer.cpp +++ b/ixwebsocket/IXSocketServer.cpp @@ -71,9 +71,11 @@ namespace ix (char*) &enable, sizeof(enable)) < 0) { std::stringstream ss; - ss << "SocketServer::listen() error calling setsockopt(SO_REUSEADDR): " - << strerror(errno); + ss << "SocketServer::listen() error calling setsockopt(SO_REUSEADDR) " + << "at address " << _host << ":" << _port + << " : " << strerror(Socket::getErrno()); + ::close(_serverFd); return std::make_pair(false, ss.str()); } @@ -93,21 +95,25 @@ namespace ix if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0) { std::stringstream ss; - ss << "SocketServer::listen() error calling bind: " - << strerror(Socket::getErrno()); + ss << "SocketServer::listen() error calling bind " + << "at address " << _host << ":" << _port + << " : " << strerror(Socket::getErrno()); + ::close(_serverFd); return std::make_pair(false, ss.str()); } - /* - * Listen for connections. Specify the tcp backlog. - */ - if (::listen(_serverFd, _backlog) != 0) + // + // Listen for connections. Specify the tcp backlog. + // + if (::listen(_serverFd, _backlog) < 0) { std::stringstream ss; - ss << "SocketServer::listen() error calling listen: " - << strerror(Socket::getErrno()); + ss << "SocketServer::listen() error calling listen " + << "at address " << _host << ":" << _port + << " : " << strerror(Socket::getErrno()); + ::close(_serverFd); return std::make_pair(false, ss.str()); } @@ -136,6 +142,7 @@ namespace ix _stop = false; _conditionVariable.notify_one(); + ::close(_serverFd); } void SocketServer::run() diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index e3c5b287..7ae28b8c 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -162,7 +162,9 @@ namespace ix { if (pollResult == PollResultType_Timeout) { - sendPing(kHeartBeatPingMessage); + std::stringstream ss; + ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; + sendPing(ss.str()); return; } diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketHeartBeatTest.cpp index 52f37175..87bf0012 100644 --- a/test/IXWebSocketHeartBeatTest.cpp +++ b/test/IXWebSocketHeartBeatTest.cpp @@ -72,11 +72,11 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, + [](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, const ix::WebSocketCloseInfo& closeInfo) { std::stringstream ss; diff --git a/test/IXWebSocketTestConnectionDisconnection.cpp b/test/IXWebSocketTestConnectionDisconnection.cpp index 945a2940..d9bf9bc8 100644 --- a/test/IXWebSocketTestConnectionDisconnection.cpp +++ b/test/IXWebSocketTestConnectionDisconnection.cpp @@ -52,12 +52,12 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) diff --git a/test/cmd_websocket_chat.cpp b/test/cmd_websocket_chat.cpp index da131df1..b3a49f45 100644 --- a/test/cmd_websocket_chat.cpp +++ b/test/cmd_websocket_chat.cpp @@ -4,11 +4,6 @@ * Copyright (c) 2017 Machine Zone. All rights reserved. */ -// -// Simple chat program that talks to the node.js server at -// websocket_chat_server/broacast-server.js -// - #include #include #include From fa7ef06f4d860c059243a291be1699f392c4121d Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 25 Jan 2019 16:11:39 -0800 Subject: [PATCH 5/5] heartbeat correct --- README.md | 9 +++++++ ixwebsocket/IXWebSocketTransport.cpp | 19 ++++++++++++-- ixwebsocket/IXWebSocketTransport.h | 5 ++++ makefile | 2 ++ test/IXWebSocketHeartBeatTest.cpp | 39 ++++++++++++++++------------ 5 files changed, 56 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 811f61ca..6d3405f4 100644 --- a/README.md +++ b/README.md @@ -309,4 +309,13 @@ A ping message can be sent to the server, with an optional data string. ``` websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long"); + +### Heartbeat. + +You can configure an optional heart beat / keep-alive, sent every 45 seconds +when there isn't any traffic to make sure that load balancers do not kill an +idle connection. + +``` +webSocket.setHeartBeatPeriod(45); ``` diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 7ae28b8c..15b155f8 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -42,7 +42,8 @@ namespace ix _closeWireSize(0), _enablePerMessageDeflate(false), _requestInitCancellation(false), - _heartBeatPeriod(kDefaultHeartBeatPeriod) + _heartBeatPeriod(kDefaultHeartBeatPeriod), + _lastSendTimePoint(std::chrono::steady_clock::now()) { } @@ -155,12 +156,23 @@ namespace ix _onCloseCallback = onCloseCallback; } + bool WebSocketTransport::exceedSendHeartBeatTimeOut() + { + std::lock_guard lock(_lastSendTimePointMutex); + auto now = std::chrono::steady_clock::now(); + return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod); + } + void WebSocketTransport::poll() { _socket->poll( [this](PollResultType pollResult) { - if (pollResult == PollResultType_Timeout) + // If (1) heartbeat is enabled, and (2) no data was received or + // send for a duration exceeding our heart-beat period, send a + // ping to the server. + if (pollResult == PollResultType_Timeout && + exceedSendHeartBeatTimeOut()) { std::stringstream ss; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; @@ -572,6 +584,9 @@ namespace ix _txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret); } } + + std::lock_guard lck(_lastSendTimePointMutex); + _lastSendTimePoint = std::chrono::steady_clock::now(); } void WebSocketTransport::close() diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index a9d53987..90585c78 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -122,6 +122,11 @@ namespace ix int _heartBeatPeriod; static const int kDefaultHeartBeatPeriod; const static std::string kHeartBeatPingMessage; + mutable std::mutex _lastSendTimePointMutex; + std::chrono::time_point _lastSendTimePoint; + + // No data was send through the socket for longer that the hearbeat period + bool exceedSendHeartBeatTimeOut(); void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, diff --git a/makefile b/makefile index 5fe7df6c..04e384f8 100644 --- a/makefile +++ b/makefile @@ -24,6 +24,8 @@ test_server: (cd test && npm i ws && node broadcast-server.js) # env TEST=Websocket_server make test +# env TEST=websocket_server make test +# env TEST=heartbeat make test test: python test/run.py diff --git a/test/IXWebSocketHeartBeatTest.cpp b/test/IXWebSocketHeartBeatTest.cpp index 87bf0012..cc1d5510 100644 --- a/test/IXWebSocketHeartBeatTest.cpp +++ b/test/IXWebSocketHeartBeatTest.cpp @@ -27,6 +27,7 @@ namespace void start(); void stop(); bool isReady() const; + void sendMessage(const std::string& text); private: ix::WebSocket _webSocket; @@ -103,6 +104,11 @@ namespace ss << "Received ping message " << str; log(ss.str()); } + else if (messageType == ix::WebSocket_MessageType_Message) + { + ss << "Received message " << str; + log(ss.str()); + } else { ss << "Invalid ix::WebSocketMessageType"; @@ -113,8 +119,14 @@ namespace _webSocket.start(); } + void WebSocketClient::sendMessage(const std::string& text) + { + _webSocket.send(text); + } + bool startServer(ix::WebSocketServer& server, std::atomic& receivedPingMessages) { + // A dev/null server server.setOnConnectionCallback( [&server, &receivedPingMessages](std::shared_ptr webSocket) { @@ -128,7 +140,7 @@ namespace { if (messageType == ix::WebSocket_MessageType_Open) { - Logger() << "New connection"; + Logger() << "New server connection"; Logger() << "Uri: " << openInfo.uri; Logger() << "Headers:"; for (auto it : openInfo.headers) @@ -138,23 +150,13 @@ namespace } else if (messageType == ix::WebSocket_MessageType_Close) { - log("Closed connection"); + log("Server closed connection"); } else if (messageType == ix::WebSocket_MessageType_Ping) { - log("Received a ping"); + log("Server received a ping"); receivedPingMessages++; } - else if (messageType == ix::WebSocket_MessageType_Message) - { - for (auto&& client : server.getClients()) - { - if (client != webSocket) - { - client->send(str); - } - } - } } ); } @@ -178,7 +180,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") { ix::setupWebSocketTrafficTrackerCallback(); - int port = 8092; + int port = 8093; ix::WebSocketServer server(port); std::atomic serverReceivedPingMessages(0); REQUIRE(startServer(server, serverReceivedPingMessages)); @@ -199,12 +201,17 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]") REQUIRE(server.getClients().size() == 2); - ix::msleep(3000); + ix::msleep(900); + webSocketClientB.sendMessage("hello world"); + ix::msleep(900); + webSocketClientB.sendMessage("hello world"); + ix::msleep(900); webSocketClientA.stop(); webSocketClientB.stop(); - REQUIRE(serverReceivedPingMessages >= 4); + REQUIRE(serverReceivedPingMessages >= 2); + REQUIRE(serverReceivedPingMessages <= 4); // Give us 500ms for the server to notice that clients went away ix::msleep(500);