Simplify ping/pong based heartbeat implementation

This commit is contained in:
Benjamin Sergeant 2020-03-18 01:13:29 -07:00
parent d6f534de06
commit b287730c19
5 changed files with 42 additions and 144 deletions

View File

@ -270,10 +270,6 @@ namespace ix
// This should keep the connection open and prevent some load balancers such as
// the Amazon one from shutting it down
_webSocket->setPingInterval(kPingIntervalSecs);
// If we don't receive a pong back, declare loss after 3 * N seconds
// (will be 90s now), and close and restart the connection
_webSocket->setPingTimeout(3 * kPingIntervalSecs);
}
void CobraConnection::configure(const ix::CobraConfig& config)

View File

@ -19,7 +19,6 @@ namespace ix
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
const int WebSocket::kDefaultPingIntervalSecs(-1);
const int WebSocket::kDefaultPingTimeoutSecs(-1);
const bool WebSocket::kDefaultEnablePong(true);
const uint32_t WebSocket::kDefaultMaxWaitBetweenReconnectionRetries(10 * 1000); // 10s
@ -31,7 +30,6 @@ namespace ix
, _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _pingTimeoutSecs(kDefaultPingTimeoutSecs)
{
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
@ -86,18 +84,6 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int heartBeatPeriodSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
_pingIntervalSecs = heartBeatPeriodSecs;
}
int WebSocket::getHeartBeatPeriod() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _pingIntervalSecs;
}
void WebSocket::setPingInterval(int pingIntervalSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
@ -110,18 +96,6 @@ namespace ix
return _pingIntervalSecs;
}
void WebSocket::setPingTimeout(int pingTimeoutSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
_pingTimeoutSecs = pingTimeoutSecs;
}
int WebSocket::getPingTimeout() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _pingTimeoutSecs;
}
void WebSocket::enablePong()
{
std::lock_guard<std::mutex> lock(_configMutex);
@ -189,8 +163,7 @@ namespace ix
_ws.configure(_perMessageDeflateOptions,
_socketTLSOptions,
_enablePong,
_pingIntervalSecs,
_pingTimeoutSecs);
_pingIntervalSecs);
}
WebSocketHttpHeaders headers(_extraHeaders);
@ -229,6 +202,9 @@ namespace ix
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
WebSocketCloseInfo()));
_ws.sendHeartBeat();
return status;
}
@ -239,8 +215,7 @@ namespace ix
_ws.configure(_perMessageDeflateOptions,
_socketTLSOptions,
_enablePong,
_pingIntervalSecs,
_pingTimeoutSecs);
_pingIntervalSecs);
}
WebSocketInitResult status = _ws.connectToSocket(socket, timeoutSecs);
@ -256,6 +231,9 @@ namespace ix
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
_ws.sendHeartBeat();
return status;
}

View File

@ -52,9 +52,7 @@ namespace ix
void setPerMessageDeflateOptions(
const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setTLSOptions(const SocketTLSOptions& socketTLSOptions);
void setHeartBeatPeriod(int heartBeatPeriodSecs);
void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod
void setPingTimeout(int pingTimeoutSecs);
void setPingInterval(int pingIntervalSecs);
void enablePong();
void disablePong();
void enablePerMessageDeflate();
@ -94,9 +92,7 @@ namespace ix
const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
int getPingInterval() const;
int getPingTimeout() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection();

View File

@ -51,26 +51,10 @@
#include <vector>
namespace
{
int greatestCommonDivisor(int a, int b)
{
while (b != 0)
{
int t = b;
b = a % b;
a = t;
}
return a;
}
} // namespace
namespace ix
{
const std::string WebSocketTransport::kPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultPingIntervalSecs(-1);
const int WebSocketTransport::kDefaultPingTimeoutSecs(-1);
const bool WebSocketTransport::kDefaultEnablePong(true);
const int WebSocketTransport::kClosingMaximumWaitingDelayInMs(300);
constexpr size_t WebSocketTransport::kChunkSize;
@ -89,11 +73,8 @@ namespace ix
, _closingTimePoint(std::chrono::steady_clock::now())
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _pingTimeoutSecs(kDefaultPingTimeoutSecs)
, _pingIntervalOrTimeoutGCDSecs(-1)
, _nextGCDTimePoint(std::chrono::steady_clock::now())
, _pongReceived(false)
, _lastSendPingTimePoint(std::chrono::steady_clock::now())
, _lastReceivePongTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
@ -107,29 +88,13 @@ namespace ix
const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions,
bool enablePong,
int pingIntervalSecs,
int pingTimeoutSecs)
int pingIntervalSecs)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_socketTLSOptions = socketTLSOptions;
_enablePong = enablePong;
_pingIntervalSecs = pingIntervalSecs;
_pingTimeoutSecs = pingTimeoutSecs;
if (pingIntervalSecs > 0 && pingTimeoutSecs > 0)
{
_pingIntervalOrTimeoutGCDSecs =
greatestCommonDivisor(pingIntervalSecs, pingTimeoutSecs);
}
else if (_pingTimeoutSecs > 0)
{
_pingIntervalOrTimeoutGCDSecs = pingTimeoutSecs;
}
else
{
_pingIntervalOrTimeoutGCDSecs = pingIntervalSecs;
}
}
// Client
@ -220,7 +185,8 @@ namespace ix
}
else if (readyState == ReadyState::OPEN)
{
initTimePointsAndGCDAfterConnect();
initTimePointsAfterConnect();
_pongReceived = false;
}
_readyState = readyState;
@ -231,22 +197,12 @@ namespace ix
_onCloseCallback = onCloseCallback;
}
void WebSocketTransport::initTimePointsAndGCDAfterConnect()
void WebSocketTransport::initTimePointsAfterConnect()
{
{
std::lock_guard<std::mutex> lock(_lastSendPingTimePointMutex);
_lastSendPingTimePoint = std::chrono::steady_clock::now();
}
{
std::lock_guard<std::mutex> lock(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
}
if (_pingIntervalOrTimeoutGCDSecs > 0)
{
_nextGCDTimePoint = std::chrono::steady_clock::now() +
std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
}
}
// Only consider send PING time points for that computation.
@ -259,13 +215,14 @@ namespace ix
return now - _lastSendPingTimePoint > std::chrono::seconds(_pingIntervalSecs);
}
bool WebSocketTransport::pingTimeoutExceeded()
void WebSocketTransport::sendHeartBeat()
{
if (_pingTimeoutSecs <= 0) return false;
if (_pingIntervalSecs <= 0) return;
std::lock_guard<std::mutex> lock(_lastReceivePongTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs);
_pongReceived = false;
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
sendPing(ss.str());
}
bool WebSocketTransport::closingDelayExceeded()
@ -279,46 +236,33 @@ namespace ix
{
if (_readyState == ReadyState::OPEN)
{
// if (1) ping timeout is enabled and (2) duration since last received
// ping response (PONG) exceeds the maximum delay, then close the connection
if (pingTimeoutExceeded())
if (pingIntervalExceeded())
{
close(WebSocketCloseConstants::kInternalErrorCode,
WebSocketCloseConstants::kPingTimeoutMessage);
}
// If ping is enabled and no ping has been sent for a duration
// exceeding our ping interval, send a ping to the server.
else if (pingIntervalExceeded())
{
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
sendPing(ss.str());
if (!_pongReceived)
{
// ping response (PONG) exceeds the maximum delay, close the connection
close(WebSocketCloseConstants::kInternalErrorCode,
WebSocketCloseConstants::kPingTimeoutMessage);
}
else
{
sendHeartBeat();
}
}
}
// No timeout if state is not OPEN, otherwise computed
// pingIntervalOrTimeoutGCD (equals to -1 if no ping and no ping timeout are set)
int lastingTimeoutDelayInMs =
(_readyState != ReadyState::OPEN) ? 0 : _pingIntervalOrTimeoutGCDSecs;
(_readyState != ReadyState::OPEN) ? 0 : _pingIntervalSecs;
if (_pingIntervalOrTimeoutGCDSecs > 0)
if (_pingIntervalSecs > 0)
{
// compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now();
if (now >= _nextGCDTimePoint)
{
_nextGCDTimePoint = now + std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
lastingTimeoutDelayInMs = _pingIntervalOrTimeoutGCDSecs * 1000;
}
else
{
lastingTimeoutDelayInMs =
(int) std::chrono::duration_cast<std::chrono::milliseconds>(_nextGCDTimePoint -
now)
.count();
}
lastingTimeoutDelayInMs =
(int) std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastSendPingTimePoint)
.count();
}
#ifdef _WIN32
@ -626,9 +570,7 @@ namespace ix
}
else if (ws.opcode == wsheader_type::PONG)
{
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
_pongReceived = true;
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)

View File

@ -75,8 +75,7 @@ namespace ix
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions,
bool enablePong,
int pingIntervalSecs,
int pingTimeoutSecs);
int pingIntervalSecs);
// Client
WebSocketInitResult connectToUrl(const std::string& url,
@ -106,6 +105,8 @@ namespace ix
void dispatch(PollResult pollResult, const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
void sendHeartBeat();
private:
std::string _url;
@ -202,39 +203,24 @@ namespace ix
static const bool kDefaultEnablePong;
// Optional ping and pong timeout
// if both ping interval and timeout are set (> 0),
// then use GCD of these value to wait for the lowest time
int _pingIntervalSecs;
int _pingTimeoutSecs;
int _pingIntervalOrTimeoutGCDSecs;
std::atomic<bool> _pongReceived;
static const int kDefaultPingIntervalSecs;
static const int kDefaultPingTimeoutSecs;
static const std::string kPingMessage;
// Record time step for ping/ ping timeout to ensure we wait for the right left duration
std::chrono::time_point<std::chrono::steady_clock> _nextGCDTimePoint;
// We record when ping are being sent so that we can know when to send the next one
// We also record when pong are being sent as a reply to pings, to close the connections
// if no pong were received sufficiently fast.
mutable std::mutex _lastSendPingTimePointMutex;
mutable std::mutex _lastReceivePongTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendPingTimePoint;
std::chrono::time_point<std::chrono::steady_clock> _lastReceivePongTimePoint;
// If this function returns true, it is time to send a new ping
bool pingIntervalExceeded();
// No PONG data was received through the socket for longer than ping timeout delay
bool pingTimeoutExceeded();
void initTimePointsAfterConnect();
// after calling close(), if no CLOSE frame answer is received back from the remote, we
// should close the connexion
bool closingDelayExceeded();
void initTimePointsAndGCDAfterConnect();
void sendCloseFrame(uint16_t code, const std::string& reason);
void closeSocketAndSwitchToClosedState(uint16_t code,