Fix crash during closing on Windows (#64)

* fix crash on close

* Improve calculateRetryWaitMilliseconds (#63)

* improve calculateRetryWaitMilliseconds

* update comment

* cout -> spdlog

* fix crash on close

* uncomment test

* Revert "uncomment test"

This reverts commit 27df86ee8f.
This commit is contained in:
Dimon4eg 2019-05-11 19:51:26 +03:00 committed by Benjamin Sergeant
parent 4df58f3059
commit 8d3a47a873
3 changed files with 60 additions and 68 deletions

View File

@ -151,16 +151,13 @@ namespace ix
close(); close();
if (!_thread.joinable()) if (_thread.joinable())
{ {
_automaticReconnection = automaticReconnection; _stop = true;
return; _thread.join();
_stop = false;
} }
_stop = true;
_thread.join();
_stop = false;
_automaticReconnection = automaticReconnection; _automaticReconnection = automaticReconnection;
} }
@ -220,72 +217,63 @@ namespace ix
return getReadyState() == WebSocket_ReadyState_Closing; return getReadyState() == WebSocket_ReadyState_Closing;
} }
bool WebSocket::isConnectedOrClosing() const
{
return isConnected() || isClosing();
}
void WebSocket::close() void WebSocket::close()
{ {
_ws.close(); _ws.close();
} }
void WebSocket::reconnectPerpetuallyIfDisconnected() void WebSocket::checkConnection(bool initial)
{ {
uint32_t retries = 0;
WebSocketErrorInfo connectErr;
ix::WebSocketInitResult status;
using millis = std::chrono::duration<double, std::milli>; using millis = std::chrono::duration<double, std::milli>;
millis duration;
// Try to connect only once when we don't have automaticReconnection setup uint32_t retries = 0;
if (!isConnectedOrClosing() && !_stop && !_automaticReconnection) millis duration;
ix::WebSocketInitResult status;
// we will try to connect perpertually
while (true)
{ {
if (isConnected() || isClosing() || _stop)
{
break;
}
if (!initial && !_automaticReconnection)
{
// don't attempt to reconnect
break;
}
initial = false;
// Only sleep if we are retrying
if (duration.count() > 0)
{
// to do: make conditional sleeping
std::this_thread::sleep_for(duration);
}
// try to connect (sync connect)
status = connect(_handshakeTimeoutSecs); status = connect(_handshakeTimeoutSecs);
if (!status.success) if (!status.success)
{ {
duration = millis(calculateRetryWaitMilliseconds(retries++)); WebSocketErrorInfo connectErr;
connectErr.retries = retries; if (_automaticReconnection)
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
}
}
else
{
// Otherwise try to reconnect perpertually
while (true)
{
if (isConnectedOrClosing() || _stop || !_automaticReconnection)
{
break;
}
status = connect(_handshakeTimeoutSecs);
if (!status.success)
{ {
duration = millis(calculateRetryWaitMilliseconds(retries++)); duration = millis(calculateRetryWaitMilliseconds(retries++));
connectErr.retries = retries;
connectErr.wait_time = duration.count(); connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr; connectErr.retries = retries;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
// Only sleep if we aren't in the middle of stopping
if (!_stop)
{
std::this_thread::sleep_for(duration);
}
} }
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
} }
} }
} }
@ -294,12 +282,20 @@ namespace ix
{ {
setThreadName(getUrl()); setThreadName(getUrl());
bool initial = true;
while (true) while (true)
{ {
if (_stop && !isClosing()) return;
// 1. Make sure we are always connected // 1. Make sure we are always connected
reconnectPerpetuallyIfDisconnected(); checkConnection(initial);
initial = false;
// if here we are closed then checkConnection was not able to connect
if (getReadyState() == WebSocket_ReadyState_Closed)
{
break;
}
// 2. Poll to see if there's any new data available // 2. Poll to see if there's any new data available
WebSocketTransport::PollPostTreatment pollPostTreatment = _ws.poll(); WebSocketTransport::PollPostTreatment pollPostTreatment = _ws.poll();
@ -315,6 +311,7 @@ namespace ix
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
switch (messageKind) switch (messageKind)
{ {
default:
case WebSocketTransport::MSG: case WebSocketTransport::MSG:
{ {
webSocketMessageType = WebSocket_MessageType_Message; webSocketMessageType = WebSocket_MessageType_Message;
@ -345,9 +342,6 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
// If we aren't trying to reconnect automatically, exit if we aren't connected
if (!isConnectedOrClosing() && !_automaticReconnection) return;
} }
} }

View File

@ -91,7 +91,6 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int heartBeatPeriodSecs); void setHeartBeatPeriod(int heartBeatPeriodSecs);
void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod
void setPingTimeout(int pingTimeoutSecs); void setPingTimeout(int pingTimeoutSecs);
@ -100,6 +99,7 @@ namespace ix
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
// stop is synchronous
void stop(); void stop();
// Run in blocking mode, by connecting first manually, and then calling run. // Run in blocking mode, by connecting first manually, and then calling run.
@ -136,13 +136,11 @@ namespace ix
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;
bool isConnectedOrClosing() const; void checkConnection(bool initial);
void reconnectPerpetuallyIfDisconnected();
std::string readyStateToString(ReadyState readyState); std::string readyStateToString(ReadyState readyState);
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);
// Server // Server
void setSocketFileDescriptor(int fd);
WebSocketInitResult connectToSocket(int fd, int timeoutSecs); WebSocketInitResult connectToSocket(int fd, int timeoutSecs);
WebSocketTransport _ws; WebSocketTransport _ws;

View File

@ -12,10 +12,10 @@ namespace ix
{ {
struct WebSocketErrorInfo struct WebSocketErrorInfo
{ {
uint32_t retries; uint32_t retries = 0;
double wait_time; double wait_time = 0;
int http_status; int http_status = 0;
std::string reason; std::string reason;
bool decompressionError; bool decompressionError = false;
}; };
} }