fix ping, fix send frame close (#49)

* fix ping, fix send frame close

* fixes for data race on _closeCode etc. and fix test

* fixing one TC

* fix waiting forever if no time to change of readyState, and poll never end

* add 1005 code if no status code received

* fixes for 1005 code

* fix test issue

* fix macOS issue

* revert to master tests and renaming
This commit is contained in:
Kumamon38 2019-05-09 18:21:05 +02:00 committed by Benjamin Sergeant
parent 28c3f2ea26
commit cb1d1bfd85
9 changed files with 433 additions and 162 deletions

View File

@ -44,14 +44,14 @@ namespace ix
close(); close();
} }
PollResultType Socket::poll(int timeoutSecs) PollResultType Socket::poll(int timeoutMs)
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
return PollResultType::Error; return PollResultType::Error;
} }
return isReadyToRead(1000 * timeoutSecs); return isReadyToRead(timeoutMs);
} }
PollResultType Socket::select(bool readyToRead, int timeoutMs) PollResultType Socket::select(bool readyToRead, int timeoutMs)

View File

@ -56,7 +56,7 @@ namespace ix
bool init(std::string& errorMsg); bool init(std::string& errorMsg);
// Functions to check whether there is activity on the socket // Functions to check whether there is activity on the socket
PollResultType poll(int timeoutSecs = kDefaultPollTimeout); PollResultType poll(int timeoutMs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType isReadyToWrite(int timeoutMs); PollResultType isReadyToWrite(int timeoutMs);

View File

@ -215,6 +215,11 @@ namespace ix
return getReadyState() == WebSocket_ReadyState_Closing; return getReadyState() == WebSocket_ReadyState_Closing;
} }
bool WebSocket::isConnectedOrClosing() const
{
return isConnected() || isClosing();
}
void WebSocket::close() void WebSocket::close()
{ {
_ws.close(); _ws.close();
@ -229,7 +234,7 @@ namespace ix
millis duration; millis duration;
// Try to connect only once when we don't have automaticReconnection setup // Try to connect only once when we don't have automaticReconnection setup
if (!isConnected() && !isClosing() && !_stop && !_automaticReconnection) if (!isConnectedOrClosing() && !_stop && !_automaticReconnection)
{ {
status = connect(_handshakeTimeoutSecs); status = connect(_handshakeTimeoutSecs);
@ -251,7 +256,7 @@ namespace ix
// Otherwise try to reconnect perpertually // Otherwise try to reconnect perpertually
while (true) while (true)
{ {
if (isConnected() || isClosing() || _stop || !_automaticReconnection) if (isConnectedOrClosing() || _stop || !_automaticReconnection)
{ {
break; break;
} }
@ -286,20 +291,17 @@ namespace ix
while (true) while (true)
{ {
if (_stop) return; if (_stop && !isClosing()) return;
// 1. Make sure we are always connected // 1. Make sure we are always connected
reconnectPerpetuallyIfDisconnected(); reconnectPerpetuallyIfDisconnected();
if (_stop) return;
// 2. Poll to see if there's any new data available // 2. Poll to see if there's any new data available
_ws.poll(); WebSocketTransport::PollPostTreatment pollPostTreatment = _ws.poll();
if (_stop) return;
// 3. Dispatch the incoming messages // 3. Dispatch the incoming messages
_ws.dispatch( _ws.dispatch(
pollPostTreatment,
[this](const std::string& msg, [this](const std::string& msg,
size_t wireSize, size_t wireSize,
bool decompressionError, bool decompressionError,
@ -340,7 +342,7 @@ namespace ix
}); });
// If we aren't trying to reconnect automatically, exit if we aren't connected // If we aren't trying to reconnect automatically, exit if we aren't connected
if (!isConnected() && !_automaticReconnection) return; if (!isConnectedOrClosing() && !_automaticReconnection) return;
} }
} }

View File

@ -136,6 +136,7 @@ namespace ix
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;
bool isConnectedOrClosing() const;
void reconnectPerpetuallyIfDisconnected(); void reconnectPerpetuallyIfDisconnected();
std::string readyStateToString(ReadyState readyState); std::string readyStateToString(ReadyState readyState);
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);

View File

@ -71,14 +71,18 @@ namespace ix
const int WebSocketTransport::kDefaultPingIntervalSecs(-1); const int WebSocketTransport::kDefaultPingIntervalSecs(-1);
const int WebSocketTransport::kDefaultPingTimeoutSecs(-1); const int WebSocketTransport::kDefaultPingTimeoutSecs(-1);
const bool WebSocketTransport::kDefaultEnablePong(true); const bool WebSocketTransport::kDefaultEnablePong(true);
const int WebSocketTransport::kClosingMaximumWaitingDelayInMs(200);
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
const uint16_t WebSocketTransport::kInternalErrorCode(1011); const uint16_t WebSocketTransport::kInternalErrorCode(1011);
const uint16_t WebSocketTransport::kAbnormalCloseCode(1006); const uint16_t WebSocketTransport::kAbnormalCloseCode(1006);
const uint16_t WebSocketTransport::kProtocolErrorCode(1002); const uint16_t WebSocketTransport::kProtocolErrorCode(1002);
const uint16_t WebSocketTransport::kNoStatusCodeErrorCode(1005);
const std::string WebSocketTransport::kInternalErrorMessage("Internal error"); const std::string WebSocketTransport::kInternalErrorMessage("Internal error");
const std::string WebSocketTransport::kAbnormalCloseMessage("Abnormal closure"); const std::string WebSocketTransport::kAbnormalCloseMessage("Abnormal closure");
const std::string WebSocketTransport::kPingTimeoutMessage("Ping timeout"); const std::string WebSocketTransport::kPingTimeoutMessage("Ping timeout");
const std::string WebSocketTransport::kProtocolErrorMessage("Protocol error"); const std::string WebSocketTransport::kProtocolErrorMessage("Protocol error");
const std::string WebSocketTransport::kNoStatusCodeErrorMessage("No status code");
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_useMask(true), _useMask(true),
@ -89,10 +93,12 @@ namespace ix
_closeRemote(false), _closeRemote(false),
_enablePerMessageDeflate(false), _enablePerMessageDeflate(false),
_requestInitCancellation(false), _requestInitCancellation(false),
_closingTimePoint(std::chrono::steady_clock::now()),
_enablePong(kDefaultEnablePong), _enablePong(kDefaultEnablePong),
_pingIntervalSecs(kDefaultPingIntervalSecs), _pingIntervalSecs(kDefaultPingIntervalSecs),
_pingTimeoutSecs(kDefaultPingTimeoutSecs), _pingTimeoutSecs(kDefaultPingTimeoutSecs),
_pingIntervalOrTimeoutGCDSecs(-1), _pingIntervalOrTimeoutGCDSecs(-1),
_nextGCDTimePoint(std::chrono::steady_clock::now()),
_lastSendPingTimePoint(std::chrono::steady_clock::now()), _lastSendPingTimePoint(std::chrono::steady_clock::now()),
_lastReceivePongTimePoint(std::chrono::steady_clock::now()) _lastReceivePongTimePoint(std::chrono::steady_clock::now())
{ {
@ -128,6 +134,11 @@ namespace ix
{ {
_pingIntervalOrTimeoutGCDSecs = pingIntervalSecs; _pingIntervalOrTimeoutGCDSecs = pingIntervalSecs;
} }
if (_pingIntervalOrTimeoutGCDSecs > 0)
{
_nextGCDTimePoint = std::chrono::steady_clock::now() + std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
}
} }
// Client // Client
@ -244,10 +255,15 @@ namespace ix
return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs); return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs);
} }
void WebSocketTransport::poll() bool WebSocketTransport::closingDelayExceeded()
{ {
PollResultType pollResult = _socket->poll(_pingIntervalOrTimeoutGCDSecs); std::lock_guard<std::mutex> lock(_closingTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _closingTimePoint > std::chrono::milliseconds(kClosingMaximumWaitingDelayInMs);
}
WebSocketTransport::PollPostTreatment WebSocketTransport::poll()
{
if (_readyState == OPEN) if (_readyState == OPEN)
{ {
// if (1) ping timeout is enabled and (2) duration since last received // if (1) ping timeout is enabled and (2) duration since last received
@ -265,6 +281,30 @@ namespace ix
sendPing(ss.str()); sendPing(ss.str());
} }
} }
// No timeout if state is not OPEN, otherwise computed
// pingIntervalOrTimeoutGCD (equals to -1 if no ping and no ping timeout are set)
int lastingTimeoutDelayInMs = (_readyState != OPEN) ? 0 : _pingIntervalOrTimeoutGCDSecs;
if (_pingIntervalOrTimeoutGCDSecs > 0)
{
// compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now();
if (now >= _nextGCDTimePoint)
{
_nextGCDTimePoint = now + std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
lastingTimeoutDelayInMs = _pingIntervalOrTimeoutGCDSecs * 1000;
}
else
{
lastingTimeoutDelayInMs = (int)std::chrono::duration_cast<std::chrono::milliseconds>(_nextGCDTimePoint - now).count();
}
}
// poll the socket
PollResultType pollResult = _socket->poll(lastingTimeoutDelayInMs);
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
@ -300,17 +340,12 @@ namespace ix
} }
else if (ret <= 0) else if (ret <= 0)
{ {
_rxbuf.clear(); // if there are received data pending to be processed, then delay the abnormal closure
// to after dispatch (other close code/reason could be read from the buffer)
_socket->close(); _socket->close();
{
std::lock_guard<std::mutex> lock(_closeDataMutex); return CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH;
_closeCode = kAbnormalCloseCode;
_closeReason = kAbnormalCloseMessage;
_closeWireSize = 0;
_closeRemote = true;
}
setReadyState(CLOSED);
break;
} }
else else
{ {
@ -329,12 +364,15 @@ namespace ix
_socket->close(); _socket->close();
} }
// Avoid a race condition where we get stuck in select if (_readyState == CLOSING && closingDelayExceeded())
// while closing.
if (_readyState == CLOSING)
{ {
_rxbuf.clear();
// close code and reason were set when calling close()
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
return NONE;
} }
bool WebSocketTransport::isSendBufferEmpty() const bool WebSocketTransport::isSendBufferEmpty() const
@ -396,12 +434,13 @@ namespace ix
// | Payload Data continued ... | // | Payload Data continued ... |
// +---------------------------------------------------------------+ // +---------------------------------------------------------------+
// //
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback) void WebSocketTransport::dispatch(WebSocketTransport::PollPostTreatment pollPostTreatment,
const OnMessageCallback& onMessageCallback)
{ {
while (true) while (true)
{ {
wsheader_type ws; wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */ if (_rxbuf.size() < 2) break; /* Need at least 2 */
const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume
ws.fin = (data[0] & 0x80) == 0x80; ws.fin = (data[0] & 0x80) == 0x80;
ws.rsv1 = (data[0] & 0x40) == 0x40; ws.rsv1 = (data[0] & 0x40) == 0x40;
@ -409,7 +448,7 @@ namespace ix
ws.mask = (data[1] & 0x80) == 0x80; ws.mask = (data[1] & 0x80) == 0x80;
ws.N0 = (data[1] & 0x7f); ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0); ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */ if (_rxbuf.size() < ws.header_size) break; /* Need: ws.header_size - _rxbuf.size() */
// //
// Calculate payload length: // Calculate payload length:
@ -540,20 +579,58 @@ namespace ix
} }
else if (ws.opcode == wsheader_type::CLOSE) else if (ws.opcode == wsheader_type::CLOSE)
{ {
std::string reason;
uint16_t code = 0;
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
// Extract the close code first, available as the first 2 bytes if (ws.N >= 2)
uint16_t code = 0; {
code |= ((uint64_t) _rxbuf[ws.header_size]) << 8; // Extract the close code first, available as the first 2 bytes
code |= ((uint64_t) _rxbuf[ws.header_size+1]) << 0; code |= ((uint64_t) _rxbuf[ws.header_size]) << 8;
code |= ((uint64_t) _rxbuf[ws.header_size+1]) << 0;
// Get the reason. // Get the reason.
std::string reason(_rxbuf.begin()+ws.header_size + 2, if (ws.N > 2)
_rxbuf.begin()+ws.header_size + (size_t) ws.N); {
reason.assign(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
}
}
else
{
// no close code received
code = kNoStatusCodeErrorCode;
reason = kNoStatusCodeErrorMessage;
}
bool remote = true; // We receive a CLOSE frame from remote and are NOT the ones who triggered the close
if (_readyState != CLOSING)
{
// send back the CLOSE frame
sendCloseFrame(code, reason);
close(code, reason, _rxbuf.size(), remote); _socket->wakeUpFromPoll(Socket::kCloseRequest);
bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
}
else
{
// we got the CLOSE frame answer from our close, so we can close the connection if
// the code/reason are the same
bool identicalReason;
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
identicalReason = _closeCode == code && _closeReason == reason;
}
if (identicalReason)
{
bool remote = false;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
}
}
} }
else else
{ {
@ -566,6 +643,25 @@ namespace ix
_rxbuf.erase(_rxbuf.begin(), _rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N); _rxbuf.begin() + ws.header_size + (size_t) ws.N);
} }
// if an abnormal closure was raised in poll, and nothing else triggered a CLOSED state in
// the received and processed data then close the connection
if (pollPostTreatment == CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH)
{
_rxbuf.clear();
// if we previously closed the connection (CLOSING state), then set state to CLOSED (code/reason were set before)
if (_readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
// if we weren't closing, then close using abnormal close code and message
else if (_readyState != CLOSED)
{
closeSocketAndSwitchToClosedState(kAbnormalCloseCode, kAbnormalCloseMessage, 0, false);
}
}
} }
std::string WebSocketTransport::getMergedChunks() const std::string WebSocketTransport::getMergedChunks() const
@ -859,29 +955,32 @@ namespace ix
} }
} }
void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote) void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason)
{ {
_requestInitCancellation = true;
if (_readyState == CLOSING || _readyState == CLOSED) return;
// See list of close events here:
// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
int codeLength = 2;
std::string closure{(char)(code >> 8), (char)(code & 0xff)};
closure.resize(codeLength + reason.size());
// copy reason after code
closure.replace(codeLength, reason.size(), reason);
bool compress = false; bool compress = false;
sendData(wsheader_type::CLOSE, closure, compress);
setReadyState(CLOSING);
_socket->wakeUpFromPoll(Socket::kCloseRequest); // if a status is set/was read
if (code != kNoStatusCodeErrorCode)
{
// See list of close events here:
// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
std::string closure{(char)(code >> 8), (char)(code & 0xff)};
// copy reason after code
closure.append(reason);
sendData(wsheader_type::CLOSE, closure, compress);
}
else
{
// no close code/reason set
sendData(wsheader_type::CLOSE, "", compress);
}
}
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote)
{
_socket->close(); _socket->close();
{ {
std::lock_guard<std::mutex> lock(_closeDataMutex); std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code; _closeCode = code;
@ -889,10 +988,33 @@ namespace ix
_closeWireSize = closeWireSize; _closeWireSize = closeWireSize;
_closeRemote = remote; _closeRemote = remote;
} }
setReadyState(CLOSED); setReadyState(CLOSED);
} }
void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote)
{
_requestInitCancellation = true;
if (_readyState == CLOSING || _readyState == CLOSED) return;
sendCloseFrame(code, reason);
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
{
std::lock_guard<std::mutex> lock(_closingTimePointMutex);
_closingTimePoint = std::chrono::steady_clock::now();
}
setReadyState(CLOSING);
// wake up the poll, but do not close yet
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
size_t WebSocketTransport::bufferedAmount() const size_t WebSocketTransport::bufferedAmount() const
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@ -56,6 +56,12 @@ namespace ix
FRAGMENT FRAGMENT
}; };
enum PollPostTreatment
{
NONE,
CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH
};
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
size_t, size_t,
bool, bool,
@ -78,7 +84,7 @@ namespace ix
WebSocketInitResult connectToSocket(int fd, // Server WebSocketInitResult connectToSocket(int fd, // Server
int timeoutSecs); int timeoutSecs);
void poll(); PollPostTreatment poll();
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback); const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message, WebSocketSendInfo sendText(const std::string& message,
@ -93,7 +99,8 @@ namespace ix
ReadyStateValues getReadyState() const; ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(PollPostTreatment pollPostTreatment,
const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const; size_t bufferedAmount() const;
private: private:
@ -162,15 +169,21 @@ namespace ix
// Used to cancel dns lookup + socket connect + http upgrade // Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation; std::atomic<bool> _requestInitCancellation;
mutable std::mutex _closingTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock>_closingTimePoint;
static const int kClosingMaximumWaitingDelayInMs;
// Constants for dealing with closing conneections // Constants for dealing with closing conneections
static const uint16_t kInternalErrorCode; static const uint16_t kInternalErrorCode;
static const uint16_t kAbnormalCloseCode; static const uint16_t kAbnormalCloseCode;
static const uint16_t kProtocolErrorCode; static const uint16_t kProtocolErrorCode;
static const uint16_t kNoStatusCodeErrorCode;
static const std::string kInternalErrorMessage; static const std::string kInternalErrorMessage;
static const std::string kAbnormalCloseMessage; static const std::string kAbnormalCloseMessage;
static const std::string kPingTimeoutMessage; static const std::string kPingTimeoutMessage;
static const std::string kProtocolErrorMessage; static const std::string kProtocolErrorMessage;
static const std::string kNoStatusCodeErrorMessage;
// enable auto response to ping // enable auto response to ping
bool _enablePong; bool _enablePong;
@ -187,6 +200,9 @@ namespace ix
static const int kDefaultPingTimeoutSecs; static const int kDefaultPingTimeoutSecs;
static const std::string kPingMessage; static const std::string kPingMessage;
// Record time step for ping/ ping timeout to ensure we wait for the right left duration
std::chrono::time_point<std::chrono::steady_clock> _nextGCDTimePoint;
// We record when ping are being sent so that we can know when to send the next one // We record when ping are being sent so that we can know when to send the next one
// We also record when pong are being sent as a reply to pings, to close the connections // We also record when pong are being sent as a reply to pings, to close the connections
// if no pong were received sufficiently fast. // if no pong were received sufficiently fast.
@ -201,6 +217,16 @@ namespace ix
// No PONG data was received through the socket for longer than ping timeout delay // No PONG data was received through the socket for longer than ping timeout delay
bool pingTimeoutExceeded(); bool pingTimeoutExceeded();
// after calling close(), if no CLOSE frame answer is received back from the remote, we should close the connexion
bool closingDelayExceeded();
void sendCloseFrame(uint16_t code, const std::string& reason);
void closeSocketAndSwitchToClosedState(uint16_t code,
const std::string& reason,
size_t closeWireSize,
bool remote);
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,

View File

@ -68,9 +68,13 @@ namespace
// The important bit for this test. // The important bit for this test.
// Set a 1 second heartbeat with the setter method to test // Set a 1 second heartbeat with the setter method to test
if (_useHeartBeatMethod) if (_useHeartBeatMethod)
{
_webSocket.setHeartBeatPeriod(1); _webSocket.setHeartBeatPeriod(1);
}
else else
{
_webSocket.setPingInterval(1); _webSocket.setPingInterval(1);
}
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
@ -109,8 +113,7 @@ namespace
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
ss << "Received message " << str; // too many messages to log
log(ss.str());
} }
else else
{ {
@ -162,6 +165,14 @@ namespace
log("Server received a ping"); log("Server received a ping");
receivedPingMessages++; receivedPingMessages++;
} }
else if (messageType == ix::WebSocket_MessageType_Message)
{
// to many messages to log
for(auto client: server.getClients())
{
client->sendText("reply");
}
}
} }
); );
} }
@ -179,96 +190,6 @@ namespace
} }
} }
TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
{
SECTION("Make sure that ping messages are sent when no other data are sent.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = true;
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(1900);
webSocketClient.stop();
// Here we test ping interval
// -> expected ping messages == 1 as 1900 seconds, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 1);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
{
SECTION("Make sure that ping messages are sent, even if other messages are sent")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = true;
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(900);
webSocketClient.sendMessage("hello world");
ix::msleep(900);
webSocketClient.sendMessage("hello world");
ix::msleep(1100);
webSocketClient.stop();
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 2 as 900+900+1100 = 2900 seconds, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 2);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_ping_no_data_sent_setPingInterval", "[setPingInterval]") TEST_CASE("Websocket_ping_no_data_sent_setPingInterval", "[setPingInterval]")
{ {
SECTION("Make sure that ping messages are sent when no other data are sent.") SECTION("Make sure that ping messages are sent when no other data are sent.")
@ -346,9 +267,6 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
webSocketClient.stop(); webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval // Here we test ping interval
// client has sent data, but ping should have been sent no matter what // client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second // -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second
@ -361,3 +279,203 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
ix::reportWebSocketTraffic(); ix::reportWebSocketTraffic();
} }
} }
TEST_CASE("Websocket_ping_data_sent_setPingInterval_half_full", "[setPingInterval]")
{
SECTION("Make sure that ping messages are sent, even if other messages are sent continuously during a given time")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
// send continuously for 1100ms
auto now = std::chrono::steady_clock::now();
while(std::chrono::steady_clock::now() - now <= std::chrono::milliseconds(900))
{
webSocketClient.sendMessage("message");
ix::msleep(1);
}
ix::msleep(150);
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 1, as 900+150 = 1050ms, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 1);
ix::msleep(100);
webSocketClient.stop();
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_ping_data_sent_setPingInterval_full", "[setPingInterval]")
{
SECTION("Make sure that ping messages are sent, even if other messages are sent continuously for longer than ping interval")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(1);
}
REQUIRE(server.getClients().size() == 1);
// send continuously for 1100ms
auto now = std::chrono::steady_clock::now();
while(std::chrono::steady_clock::now() - now <= std::chrono::milliseconds(1100))
{
webSocketClient.sendMessage("message");
ix::msleep(1);
}
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 1, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 1);
ix::msleep(100);
webSocketClient.stop();
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
// Using setHeartBeatPeriod
TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
{
SECTION("Make sure that ping messages are sent when no other data are sent.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = true;
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(1);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(1850);
webSocketClient.stop();
// Here we test ping interval
// -> expected ping messages == 1 as 1850 seconds, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 1);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
{
SECTION("Make sure that ping messages are sent, even if other messages are sent")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
bool useSetHeartBeatPeriodMethod = true;
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(1);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(900);
webSocketClient.sendMessage("hello world");
ix::msleep(900);
webSocketClient.sendMessage("hello world");
ix::msleep(900);
webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 2 as 900+900+900 = 2700 seconds, 1 ping sent every second
REQUIRE(serverReceivedPingMessages == 2);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}

View File

@ -78,6 +78,7 @@ namespace
} }
_webSocket.setUrl(url); _webSocket.setUrl(url);
_webSocket.disableAutomaticReconnection();
// The important bit for this test. // The important bit for this test.
// Set a ping interval, and a ping timeout // Set a ping interval, and a ping timeout
@ -100,7 +101,6 @@ namespace
{ {
log("client connected"); log("client connected");
_webSocket.disableAutomaticReconnection();
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@ -111,15 +111,11 @@ namespace
_closedDueToPingTimeout = true; _closedDueToPingTimeout = true;
} }
_webSocket.disableAutomaticReconnection();
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << error.reason;
log(ss.str()); log(ss.str());
_webSocket.disableAutomaticReconnection();
} }
else if (messageType == ix::WebSocket_MessageType_Pong) else if (messageType == ix::WebSocket_MessageType_Pong)
{ {

View File

@ -70,7 +70,9 @@ namespace
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
log("cmd_websocket_satori_chat: Error!"); ss << "cmd_websocket_satori_chat: Error! ";
ss << error.reason;
log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
@ -84,6 +86,10 @@ namespace
{ {
log("cmd_websocket_satori_chat: received pong message.!"); log("cmd_websocket_satori_chat: received pong message.!");
} }
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_satori_chat: received fragment.!");
}
else else
{ {
log("Invalid ix::WebSocketMessageType"); log("Invalid ix::WebSocketMessageType");