From 1778874ea8f99d48ad04f01b7f7623bf3eb46210 Mon Sep 17 00:00:00 2001 From: Dimon4eg Date: Fri, 10 May 2019 20:32:17 +0300 Subject: [PATCH 1/4] fix crash on close --- ixwebsocket/IXWebSocket.cpp | 16 ++++++---------- ixwebsocket/IXWebSocket.h | 2 -- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index e87c403f..becc7b9d 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -144,18 +144,16 @@ namespace ix // This value needs to be forced when shutting down, it is restored later _automaticReconnection = false; + // sync close close(); - if (!_thread.joinable()) + if (_thread.joinable()) { - _automaticReconnection = automaticReconnection; - return; + _stop = true; + _thread.join(); + _stop = false; } - _stop = true; - _thread.join(); - _stop = false; - _automaticReconnection = automaticReconnection; } @@ -289,10 +287,8 @@ namespace ix { setThreadName(getUrl()); - while (true) + while (getReadyState() != WebSocket_ReadyState_Closed) { - if (_stop && !isClosing()) return; - // 1. Make sure we are always connected reconnectPerpetuallyIfDisconnected(); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 07c00a4a..5c6bc041 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -91,7 +91,6 @@ namespace ix void setUrl(const std::string& url); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); - void setHandshakeTimeout(int handshakeTimeoutSecs); void setHeartBeatPeriod(int heartBeatPeriodSecs); void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod void setPingTimeout(int pingTimeoutSecs); @@ -142,7 +141,6 @@ namespace ix static void invokeTrafficTrackerCallback(size_t size, bool incoming); // Server - void setSocketFileDescriptor(int fd); WebSocketInitResult connectToSocket(int fd, int timeoutSecs); WebSocketTransport _ws; From 0bc1755f3a5e3ae5da9efc67a0c210a9472f1008 Mon Sep 17 00:00:00 2001 From: Dimon4eg Date: Fri, 10 May 2019 22:31:21 +0300 Subject: [PATCH 2/4] Improve calculateRetryWaitMilliseconds (#63) * improve calculateRetryWaitMilliseconds * update comment --- ixwebsocket/IXWebSocket.cpp | 23 ++++++++++++++--------- ixwebsocket/IXWebSocketErrorInfo.h | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index becc7b9d..40b1c1ad 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -13,16 +13,21 @@ namespace { - uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count) + uint64_t calculateRetryWaitMilliseconds(uint32_t retry_count) { - // This will overflow quite fast for large value of retry_count - // and will become 0, in which case the wait time will be none - // and we'll be constantly retrying to connect. - uint64_t wait_time = ((uint64_t) std::pow(2, retry_count) * 100L); + uint64_t wait_time; - // cap the wait time to 10s, or to retry_count == 10 for which wait_time > 10s - uint64_t tenSeconds = 10 * 1000; - return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time; + if (retry_count <= 6) + { + // max wait_time is 6400 ms (2 ^ 6 = 64) + wait_time = ((uint64_t)std::pow(2, retry_count) * 100L); + } + else + { + wait_time = 10 * 1000; // 10 sec + } + + return wait_time; } } @@ -225,7 +230,7 @@ namespace ix void WebSocket::reconnectPerpetuallyIfDisconnected() { - uint64_t retries = 0; + uint32_t retries = 0; WebSocketErrorInfo connectErr; ix::WebSocketInitResult status; using millis = std::chrono::duration; diff --git a/ixwebsocket/IXWebSocketErrorInfo.h b/ixwebsocket/IXWebSocketErrorInfo.h index a8a02864..8b515fac 100644 --- a/ixwebsocket/IXWebSocketErrorInfo.h +++ b/ixwebsocket/IXWebSocketErrorInfo.h @@ -12,7 +12,7 @@ namespace ix { struct WebSocketErrorInfo { - uint64_t retries; + uint32_t retries; double wait_time; int http_status; std::string reason; From 6cd970d0d9fc84e5da01b9be0048a193203e13f7 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 10 May 2019 12:33:21 -0700 Subject: [PATCH 3/4] cout -> spdlog --- .../statsd-client-cpp/src/statsd_client.cpp | 1 - ws/ws_cobra_to_statsd.cpp | 19 ++++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/third_party/statsd-client-cpp/src/statsd_client.cpp b/third_party/statsd-client-cpp/src/statsd_client.cpp index e58af652..ca39e865 100644 --- a/third_party/statsd-client-cpp/src/statsd_client.cpp +++ b/third_party/statsd-client-cpp/src/statsd_client.cpp @@ -221,7 +221,6 @@ int StatsdClient::send(const string &message) int StatsdClient::send_to_daemon(const string &message) { - std::cout << "send_to_daemon: " << message.length() << " B" << std::endl; int ret = init(); if ( ret ) { diff --git a/ws/ws_cobra_to_statsd.cpp b/ws/ws_cobra_to_statsd.cpp index fb1e104f..cadcfed7 100644 --- a/ws/ws_cobra_to_statsd.cpp +++ b/ws/ws_cobra_to_statsd.cpp @@ -13,6 +13,7 @@ #include #include +#include namespace ix { @@ -90,20 +91,20 @@ namespace ix { if (eventType == ix::CobraConnection_EventType_Open) { - std::cout << "Subscriber: connected" << std::endl; + spdlog::info("Subscriber connected"); for (auto it : headers) { - std::cerr << it.first << ": " << it.second << std::endl; + spdlog::info("{}: {}", it.first, it.second); } } if (eventType == ix::CobraConnection_EventType_Closed) { - std::cout << "Subscriber: closed" << std::endl; + spdlog::info("Subscriber closed"); } else if (eventType == ix::CobraConnection_EventType_Authenticated) { - std::cout << "Subscriber authenticated" << std::endl; + spdlog::info("Subscriber authenticated"); conn.subscribe(channel, [&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount] @@ -111,7 +112,7 @@ namespace ix { if (verbose) { - std::cout << jsonWriter.write(msg) << std::endl; + spdlog::info(jsonWriter.write(msg)); } std::string id; @@ -121,22 +122,22 @@ namespace ix id += extractAttr(attr, msg); } - std::cout << msgCount++ << " " << prefix << id << std::endl; + spdlog::info("{} {}{}", msgCount++, prefix, id); statsdClient.count(id, 1); }); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { - std::cout << "Subscriber: subscribed to channel " << subscriptionId << std::endl; + spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_UnSubscribed) { - std::cout << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl; + spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_Error) { - std::cout << "Subscriber: error" << errMsg << std::endl; + spdlog::error("Subscriber: error {}", errMsg); } } ); From ef2e5c7ddb4230b141c0d02d280790e24bd5b293 Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Sat, 11 May 2019 12:07:25 +0300 Subject: [PATCH 4/4] fix crash on close --- ixwebsocket/IXWebSocket.cpp | 104 ++++++++++++++--------------- ixwebsocket/IXWebSocket.h | 4 +- ixwebsocket/IXWebSocketErrorInfo.h | 8 +-- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 40b1c1ad..7d568561 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -149,7 +149,6 @@ namespace ix // This value needs to be forced when shutting down, it is restored later _automaticReconnection = false; - // sync close close(); if (_thread.joinable()) @@ -218,72 +217,63 @@ namespace ix return getReadyState() == WebSocket_ReadyState_Closing; } - bool WebSocket::isConnectedOrClosing() const - { - return isConnected() || isClosing(); - } - void WebSocket::close() { _ws.close(); } - void WebSocket::reconnectPerpetuallyIfDisconnected() + void WebSocket::checkConnection(bool initial) { - uint32_t retries = 0; - WebSocketErrorInfo connectErr; - ix::WebSocketInitResult status; using millis = std::chrono::duration; - millis duration; - // Try to connect only once when we don't have automaticReconnection setup - if (!isConnectedOrClosing() && !_stop && !_automaticReconnection) + uint32_t retries = 0; + millis duration; + ix::WebSocketInitResult status; + + // we will try to connect perpertually + while (true) { + if (isConnected() || isClosing() || _stop) + { + break; + } + + if (!initial && !_automaticReconnection) + { + // don't attempt to reconnect + break; + } + + initial = false; + + // Only sleep if we are retrying + if (duration.count() > 0) + { + // to do: make conditional sleeping + std::this_thread::sleep_for(duration); + } + + // try to connect (sync connect) status = connect(_handshakeTimeoutSecs); if (!status.success) { - duration = millis(calculateRetryWaitMilliseconds(retries++)); + WebSocketErrorInfo connectErr; - connectErr.retries = retries; - connectErr.wait_time = duration.count(); - connectErr.reason = status.errorStr; - connectErr.http_status = status.http_status; - _onMessageCallback(WebSocket_MessageType_Error, "", 0, - connectErr, WebSocketOpenInfo(), - WebSocketCloseInfo()); - } - } - else - { - // Otherwise try to reconnect perpertually - while (true) - { - if (isConnectedOrClosing() || _stop || !_automaticReconnection) - { - break; - } - - status = connect(_handshakeTimeoutSecs); - - if (!status.success) + if (_automaticReconnection) { duration = millis(calculateRetryWaitMilliseconds(retries++)); - connectErr.retries = retries; connectErr.wait_time = duration.count(); - connectErr.reason = status.errorStr; - connectErr.http_status = status.http_status; - _onMessageCallback(WebSocket_MessageType_Error, "", 0, - connectErr, WebSocketOpenInfo(), - WebSocketCloseInfo()); - - // Only sleep if we aren't in the middle of stopping - if (!_stop) - { - std::this_thread::sleep_for(duration); - } + connectErr.retries = retries; } + + connectErr.reason = status.errorStr; + connectErr.http_status = status.http_status; + + _onMessageCallback(WebSocket_MessageType_Error, "", 0, + connectErr, WebSocketOpenInfo(), + WebSocketCloseInfo()); } } } @@ -292,10 +282,20 @@ namespace ix { setThreadName(getUrl()); - while (getReadyState() != WebSocket_ReadyState_Closed) + bool initial = true; + + while (true) { // 1. Make sure we are always connected - reconnectPerpetuallyIfDisconnected(); + checkConnection(initial); + + initial = false; + + // if here we are closed then checkConnection was not able to connect + if (getReadyState() == WebSocket_ReadyState_Closed) + { + break; + } // 2. Poll to see if there's any new data available WebSocketTransport::PollPostTreatment pollPostTreatment = _ws.poll(); @@ -311,6 +311,7 @@ namespace ix WebSocketMessageType webSocketMessageType; switch (messageKind) { + default: case WebSocketTransport::MSG: { webSocketMessageType = WebSocket_MessageType_Message; @@ -341,9 +342,6 @@ namespace ix WebSocket::invokeTrafficTrackerCallback(msg.size(), true); }); - - // If we aren't trying to reconnect automatically, exit if we aren't connected - if (!isConnectedOrClosing() && !_automaticReconnection) return; } } diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 5c6bc041..1520645c 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -99,6 +99,7 @@ namespace ix // Run asynchronously, by calling start and stop. void start(); + // stop is synchronous void stop(); // Run in blocking mode, by connecting first manually, and then calling run. @@ -135,8 +136,7 @@ namespace ix bool isConnected() const; bool isClosing() const; - bool isConnectedOrClosing() const; - void reconnectPerpetuallyIfDisconnected(); + void checkConnection(bool initial); std::string readyStateToString(ReadyState readyState); static void invokeTrafficTrackerCallback(size_t size, bool incoming); diff --git a/ixwebsocket/IXWebSocketErrorInfo.h b/ixwebsocket/IXWebSocketErrorInfo.h index 8b515fac..e8f16c62 100644 --- a/ixwebsocket/IXWebSocketErrorInfo.h +++ b/ixwebsocket/IXWebSocketErrorInfo.h @@ -12,10 +12,10 @@ namespace ix { struct WebSocketErrorInfo { - uint32_t retries; - double wait_time; - int http_status; + uint32_t retries = 0; + double wait_time = 0; + int http_status = 0; std::string reason; - bool decompressionError; + bool decompressionError = false; }; }