From a8b6573f96bf90d64c87ec5194038a3fff8aeb0e Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 24 Jan 2019 12:42:49 -0800 Subject: [PATCH] hearbeat --- ixwebsocket/IXSocket.cpp | 30 +++++++++++++++++++--------- ixwebsocket/IXSocket.h | 16 +++++++++++++-- ixwebsocket/IXWebSocket.cpp | 21 ++++++++++++++++--- ixwebsocket/IXWebSocket.h | 8 +++++++- ixwebsocket/IXWebSocketTransport.cpp | 21 +++++++++++++++---- ixwebsocket/IXWebSocketTransport.h | 8 +++++++- test/run.py | 2 +- 7 files changed, 85 insertions(+), 21 deletions(-) diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index cfedb3b0..d456d3d2 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -21,6 +21,9 @@ namespace ix { + const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default + const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; + Socket::Socket(int fd) : _sockfd(fd) { @@ -32,14 +35,8 @@ namespace ix close(); } - void Socket::poll(const OnPollCallback& onPollCallback) + void Socket::poll(const OnPollCallback& onPollCallback, int timeoutSecs) { - if (_sockfd == -1) - { - onPollCallback(); - return; - } - fd_set rfds; FD_ZERO(&rfds); FD_SET(_sockfd, &rfds); @@ -48,11 +45,26 @@ namespace ix FD_SET(_eventfd.getFd(), &rfds); #endif + struct timeval timeout; + timeout.tv_sec = timeoutSecs; + timeout.tv_usec = 0; + int sockfd = _sockfd; int nfds = (std::max)(sockfd, _eventfd.getFd()); - select(nfds + 1, &rfds, nullptr, nullptr, nullptr); + int ret = select(nfds + 1, &rfds, nullptr, nullptr, + (timeoutSecs == kDefaultPollNoTimeout) ? nullptr : &timeout); - onPollCallback(); + PollResultType pollResult = PollResultType_ReadyForRead; + if (ret < 0) + { + pollResult = PollResultType_Error; + } + else if (ret == 0) + { + pollResult = PollResultType_Timeout; + } + + onPollCallback(pollResult); } void Socket::wakeUpFromPoll() diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 5004d66b..fc72a291 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -21,16 +21,24 @@ typedef SSIZE_T ssize_t; namespace ix { + enum PollResultType + { + PollResultType_ReadyForRead = 0, + PollResultType_Timeout = 1, + PollResultType_Error = 2 + }; + class Socket { public: - using OnPollCallback = std::function; + using OnPollCallback = std::function; Socket(int fd = -1); virtual ~Socket(); void configure(); - virtual void poll(const OnPollCallback& onPollCallback); + virtual void poll(const OnPollCallback& onPollCallback, + int timeoutSecs = kDefaultPollTimeout); virtual void wakeUpFromPoll(); // Virtual methods @@ -62,5 +70,9 @@ namespace ix std::atomic _sockfd; std::mutex _socketMutex; EventFd _eventfd; + + private: + static const int kDefaultPollTimeout; + static const int kDefaultPollNoTimeout; }; } diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index b51ed980..d3463cf3 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -31,12 +31,14 @@ namespace ix { OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; const int WebSocket::kDefaultHandShakeTimeoutSecs(60); + const int WebSocket::kDefaultHeartBeatPeriod(-1); WebSocket::WebSocket() : _onMessageCallback(OnMessageCallback()), _stop(false), _automaticReconnection(true), - _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs) + _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs), + _heartBeatPeriod(kDefaultHeartBeatPeriod) { _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize) @@ -77,6 +79,18 @@ namespace ix return _perMessageDeflateOptions; } + void WebSocket::setHeartBeatPeriod(int hearBeatPeriod) + { + std::lock_guard lock(_configMutex); + _heartBeatPeriod = hearBeatPeriod; + } + + int WebSocket::getHeartBeatPeriod() const + { + std::lock_guard lock(_configMutex); + return _heartBeatPeriod; + } + void WebSocket::start() { if (_thread.joinable()) return; // we've already been started @@ -110,7 +124,8 @@ namespace ix { { std::lock_guard lock(_configMutex); - _ws.configure(_perMessageDeflateOptions); + _ws.configure(_perMessageDeflateOptions, + _heartBeatPeriod); } WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs); @@ -130,7 +145,7 @@ namespace ix { { std::lock_guard lock(_configMutex); - _ws.configure(_perMessageDeflateOptions); + _ws.configure(_perMessageDeflateOptions, _heartBeatPeriod); } WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index ef75d8df..081b0cc4 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -86,7 +86,8 @@ namespace ix void setUrl(const std::string& url); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); - void setHandshakeTimeout(int _handshakeTimeoutSecs); + void setHandshakeTimeout(int handshakeTimeoutSecs); + void setHeartBeatPeriod(int hearBeatPeriod); // Run asynchronously, by calling start and stop. void start(); @@ -107,6 +108,7 @@ namespace ix ReadyState getReadyState() const; const std::string& getUrl() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; + int getHeartBeatPeriod() const; void enableAutomaticReconnection(); void disableAutomaticReconnection(); @@ -142,6 +144,10 @@ namespace ix std::atomic _handshakeTimeoutSecs; static const int kDefaultHandShakeTimeoutSecs; + // Optional Heartbeat + int _heartBeatPeriod; + static const int kDefaultHeartBeatPeriod; + friend class WebSocketServer; }; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 4c92a901..e3c5b287 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -33,12 +33,16 @@ namespace ix { + const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat"); + const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); + WebSocketTransport::WebSocketTransport() : _readyState(CLOSED), _closeCode(0), _closeWireSize(0), _enablePerMessageDeflate(false), - _requestInitCancellation(false) + _requestInitCancellation(false), + _heartBeatPeriod(kDefaultHeartBeatPeriod) { } @@ -48,10 +52,12 @@ namespace ix ; } - void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) + void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, + int hearBeatPeriod) { _perMessageDeflateOptions = perMessageDeflateOptions; _enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); + _heartBeatPeriod = hearBeatPeriod; } // Client @@ -152,8 +158,14 @@ namespace ix void WebSocketTransport::poll() { _socket->poll( - [this]() + [this](PollResultType pollResult) { + if (pollResult == PollResultType_Timeout) + { + sendPing(kHeartBeatPingMessage); + return; + } + while (true) { int N = (int) _rxbuf.size(); @@ -185,7 +197,8 @@ namespace ix _socket->close(); setReadyState(CLOSED); } - }); + }, + _heartBeatPeriod); } bool WebSocketTransport::isSendBufferEmpty() const diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index a2289e83..a9d53987 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -57,7 +57,8 @@ namespace ix WebSocketTransport(); ~WebSocketTransport(); - void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); + void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, + int hearBeatPeriod); WebSocketInitResult connectToUrl(const std::string& url, // Client int timeoutSecs); @@ -116,6 +117,11 @@ namespace ix // Used to cancel dns lookup + socket connect + http upgrade std::atomic _requestInitCancellation; + + // Optional Heartbeat + int _heartBeatPeriod; + static const int kDefaultHeartBeatPeriod; + const static std::string kHeartBeatPingMessage; void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, diff --git a/test/run.py b/test/run.py index 2f78c61d..3cee95bd 100644 --- a/test/run.py +++ b/test/run.py @@ -19,7 +19,7 @@ if osName == 'Windows': testBinary ='ixwebsocket_unittest.exe' else: generator = '' - make = 'make' + make = 'make -j6' testBinary ='./ixwebsocket_unittest' sanitizersFlags = {