Real ping (#32)

* close method change and fix code

* missing mutex

* wip

* renaming and fixes

* renaming, fixes

* added enablePong/disablePong, add tests

* added new test cases

* add 1 test case

* fix gcd name to greatestCommonDivisor

* move ping and ping timeout checks into socket poll, local var in test cases and indent fixes

* indent issue
This commit is contained in:
Kumamon38
2019-04-18 18:24:16 +02:00
committed by Benjamin Sergeant
parent 65b11cb968
commit c3431f19bf
9 changed files with 850 additions and 78 deletions

View File

@ -51,10 +51,23 @@
#include <thread>
int greatestCommonDivisor (int a, int b) {
while (b != 0)
{
int t = b;
b = a % b;
a = t;
}
return a;
}
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
const std::string WebSocketTransport::kPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultPingIntervalSecs(-1);
const int WebSocketTransport::kDefaultPingTimeoutSecs(-1);
const bool WebSocketTransport::kDefaultEnablePong(true);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
@ -64,8 +77,12 @@ namespace ix
_closeWireSize(0),
_enablePerMessageDeflate(false),
_requestInitCancellation(false),
_heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
_enablePong(kDefaultEnablePong),
_pingIntervalSecs(kDefaultPingIntervalSecs),
_pingTimeoutSecs(kDefaultPingTimeoutSecs),
_pingIntervalOrTimeoutGCDSecs(-1),
_lastSendPingTimePoint(std::chrono::steady_clock::now()),
_lastReceivePongTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
@ -76,11 +93,21 @@ namespace ix
}
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int heartBeatPeriod)
bool enablePong,
int pingIntervalSecs, int pingTimeoutSecs)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = heartBeatPeriod;
_enablePong = enablePong;
_pingIntervalSecs = pingIntervalSecs;
_pingTimeoutSecs = pingTimeoutSecs;
if (pingIntervalSecs > 0 && pingTimeoutSecs > 0)
_pingIntervalOrTimeoutGCDSecs = greatestCommonDivisor(pingIntervalSecs, pingTimeoutSecs);
else if (_pingTimeoutSecs > 0)
_pingIntervalOrTimeoutGCDSecs = pingTimeoutSecs;
else
_pingIntervalOrTimeoutGCDSecs = pingIntervalSecs;
}
// Client
@ -176,13 +203,25 @@ namespace ix
_onCloseCallback = onCloseCallback;
}
// Only consider send time points for that computation.
// The receive time points is taken into account in Socket::poll (second parameter).
bool WebSocketTransport::heartBeatPeriodExceeded()
// Only consider send PING time points for that computation.
bool WebSocketTransport::pingIntervalExceeded()
{
std::lock_guard<std::mutex> lock(_lastSendTimePointMutex);
if (_pingIntervalSecs <= 0)
return false;
std::lock_guard<std::mutex> lock(_lastSendPingTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod);
return now - _lastSendPingTimePoint > std::chrono::seconds(_pingIntervalSecs);
}
bool WebSocketTransport::pingTimeoutExceeded()
{
if (_pingTimeoutSecs <= 0)
return false;
std::lock_guard<std::mutex> lock(_lastReceivePongTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs);
}
void WebSocketTransport::poll()
@ -190,19 +229,27 @@ namespace ix
_socket->poll(
[this](PollResultType pollResult)
{
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded())
if (_readyState == OPEN)
{
std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str());
// if (1) ping timeout is enabled and (2) duration since last received ping response (PONG)
// exceeds the maximum delay, then close the connection
if (pingTimeoutExceeded())
{
close(1011, "Ping timeout");
}
// If (1) ping is enabled and no ping has been sent for a duration
// exceeding our ping interval, send a ping to the server.
else if (pingIntervalExceeded())
{
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
sendPing(ss.str());
}
}
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType::SendRequest)
if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
@ -264,7 +311,7 @@ namespace ix
_socket->close();
}
},
_heartBeatPeriod);
_pingIntervalOrTimeoutGCDSecs);
}
bool WebSocketTransport::isSendBufferEmpty() const
@ -444,12 +491,16 @@ namespace ix
else if (ws.opcode == wsheader_type::PING)
{
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
if (_enablePong)
{
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
}
emitMessage(PING, pingData, ws, onMessageCallback);
}
@ -459,6 +510,9 @@ namespace ix
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(PONG, pongData, ws, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
@ -724,7 +778,16 @@ namespace ix
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
{
bool compress = false;
return sendData(wsheader_type::PING, message, compress);
WebSocketSendInfo info = sendData(wsheader_type::PING, message, compress);
if(info.success)
{
std::lock_guard<std::mutex> lck(_lastSendPingTimePointMutex);
_lastSendPingTimePoint = std::chrono::steady_clock::now();
}
return info;
}
WebSocketSendInfo WebSocketTransport::sendBinary(
@ -770,9 +833,6 @@ namespace ix
_txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret);
}
}
std::lock_guard<std::mutex> lck(_lastSendTimePointMutex);
_lastSendTimePoint = std::chrono::steady_clock::now();
}
void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize)