From 49bf8bd83084d95801f4089d92bddcb5a41d7694 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 6 Dec 2018 08:27:28 -0800 Subject: [PATCH] threading race condition fixes, detected by TSAN --- README.md | 2 +- .../satori_publisher/IXSatoriConnection.cpp | 104 ++++++++++-------- .../satori_publisher/IXSatoriConnection.h | 30 ++++- .../satori_publisher/satori_publisher.cpp | 2 +- ixwebsocket/IXWebSocket.cpp | 1 + ixwebsocket/IXWebSocket.h | 5 +- ixwebsocket/IXWebSocketPerMessageDeflate.cpp | 4 + ixwebsocket/IXWebSocketPerMessageDeflate.h | 1 - ixwebsocket/IXWebSocketSendInfo.h | 9 +- ixwebsocket/IXWebSocketTransport.cpp | 3 +- 10 files changed, 102 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 660e1356..a4223fee 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Introduction [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex -communication channels over a single TCP connection. This library provides a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. +communication channels over a single TCP connection. *IXWebSocket* is a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. * macOS * iOS diff --git a/examples/satori_publisher/IXSatoriConnection.cpp b/examples/satori_publisher/IXSatoriConnection.cpp index cc35ee88..73b1de96 100644 --- a/examples/satori_publisher/IXSatoriConnection.cpp +++ b/examples/satori_publisher/IXSatoriConnection.cpp @@ -21,11 +21,12 @@ namespace ix SatoriConnection::SatoriConnection() : _authenticated(false), - _eventCallback(nullptr) + _eventCallback(nullptr), + _publishMode(SatoriConnection_PublishMode_Immediate) { _pdu["action"] = "rtm/publish"; - resetWebSocketOnMessageCallback(); + initWebSocketOnMessageCallback(); } SatoriConnection::~SatoriConnection() @@ -53,6 +54,7 @@ namespace ix void SatoriConnection::setEventCallback(const EventCallback& eventCallback) { + std::lock_guard lock(_eventCallbackMutex); _eventCallback = eventCallback; } @@ -60,6 +62,7 @@ namespace ix const std::string& errorMsg, const WebSocketHttpHeaders& headers) { + std::lock_guard lock(_eventCallbackMutex); if (_eventCallback) { _eventCallback(eventType, errorMsg, headers); @@ -73,46 +76,12 @@ namespace ix void SatoriConnection::disconnect() { + _authenticated = false; _webSocket.stop(); - - resetWebSocketOnMessageCallback(); } - void SatoriConnection::resetWebSocketOnMessageCallback() + void SatoriConnection::initWebSocketOnMessageCallback() { - _webSocket.setOnMessageCallback( - [](ix::WebSocketMessageType, - const std::string&, - size_t, - const ix::WebSocketErrorInfo&, - const ix::WebSocketCloseInfo&, - const ix::WebSocketHttpHeaders&) - { - ; - } - ); - } - - void SatoriConnection::configure(const std::string& appkey, - const std::string& endpoint, - const std::string& rolename, - const std::string& rolesecret, - WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions) - { - _appkey = appkey; - _endpoint = endpoint; - _role_name = rolename; - _role_secret = rolesecret; - - std::stringstream ss; - ss << endpoint; - ss << "/v2?appkey="; - ss << appkey; - - std::string url = ss.str(); - _webSocket.setUrl(url); - _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); - _webSocket.setOnMessageCallback( [this](ix::WebSocketMessageType messageType, const std::string& str, @@ -201,6 +170,32 @@ namespace ix }); } + void SatoriConnection::setPublishMode(SatoriConnectionPublishMode publishMode) + { + _publishMode = publishMode; + } + + void SatoriConnection::configure(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions) + { + _appkey = appkey; + _endpoint = endpoint; + _role_name = rolename; + _role_secret = rolesecret; + + std::stringstream ss; + ss << _endpoint; + ss << "/v2?appkey="; + ss << _appkey; + + std::string url = ss.str(); + _webSocket.setUrl(url); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + } + // // Handshake message schema. // @@ -228,7 +223,7 @@ namespace ix pdu["action"] = "auth/handshake"; pdu["body"] = body; - std::string serializedJson = _jsonWriter.write(pdu); + std::string serializedJson = serializeJson(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); return _webSocket.send(serializedJson).success; @@ -290,7 +285,7 @@ namespace ix pdu["action"] = "auth/authenticate"; pdu["body"] = body; - std::string serializedJson = _jsonWriter.write(pdu); + std::string serializedJson = serializeJson(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); return _webSocket.send(serializedJson).success; @@ -307,6 +302,7 @@ namespace ix if (!body.isMember("subscription_id")) return false; Json::Value subscriptionId = body["subscription_id"]; + std::lock_guard lock(_cbsMutex); auto cb = _cbs.find(subscriptionId.asString()); if (cb == _cbs.end()) return false; // cannot find callback @@ -333,17 +329,29 @@ namespace ix return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open; } + std::string SatoriConnection::serializeJson(const Json::Value& value) + { + std::lock_guard lock(_jsonWriterMutex); + return _jsonWriter.write(value); + } + // // publish is not thread safe as we are trying to reuse some Json objects. // - bool SatoriConnection::publish(const std::string& channel, + bool SatoriConnection::publish(const Json::Value& channels, const Json::Value& msg) { - _body["channel"] = channel; + _body["channels"] = channels; _body["message"] = msg; _pdu["body"] = _body; - std::string serializedJson = _jsonWriter.write(_pdu); + std::string serializedJson = serializeJson(_pdu); + + if (_publishMode == SatoriConnection_PublishMode_Batch) + { + enqueue(serializedJson); + return true; + } // // Fast path. We are authenticated and the publishing succeed @@ -453,5 +461,15 @@ namespace ix false); return webSocketSendInfo.success; } + + void SatoriConnection::suspend() + { + disconnect(); + } + + void SatoriConnection::resume() + { + connect(); + } } // namespace ix diff --git a/examples/satori_publisher/IXSatoriConnection.h b/examples/satori_publisher/IXSatoriConnection.h index db47b472..480af3cb 100644 --- a/examples/satori_publisher/IXSatoriConnection.h +++ b/examples/satori_publisher/IXSatoriConnection.h @@ -26,6 +26,12 @@ namespace ix SatoriConnection_EventType_Closed = 3 }; + enum SatoriConnectionPublishMode + { + SatoriConnection_PublishMode_Immediate = 0, + SatoriConnection_PublishMode_Batch = 1 + }; + using SubscriptionCallback = std::function; using EventCallback = std::function _publishMode; // Can be set on control+background thread, protecting with an atomic std::atomic _authenticated; @@ -119,12 +135,14 @@ namespace ix Json::Value _body; Json::Value _pdu; Json::FastWriter _jsonWriter; + mutable std::mutex _jsonWriterMutex; /// Traffic tracker callback static TrafficTrackerCallback _trafficTrackerCallback; /// Satori events callbacks EventCallback _eventCallback; + mutable std::mutex _eventCallbackMutex; /// Subscription callbacks, only one per channel std::unordered_map _cbs; @@ -139,7 +157,7 @@ namespace ix mutable std::mutex _queueMutex; // Cap the queue size (100 elems so far -> ~100k) - static constexpr size_t kQueueMaxSize = 100; + static constexpr size_t kQueueMaxSize = 256; }; } // namespace ix diff --git a/examples/satori_publisher/satori_publisher.cpp b/examples/satori_publisher/satori_publisher.cpp index 8ecfae4c..7b2c40a3 100644 --- a/examples/satori_publisher/satori_publisher.cpp +++ b/examples/satori_publisher/satori_publisher.cpp @@ -55,7 +55,7 @@ int main(int argc, char* argv[]) satoriConnection.configure(appkey, endpoint, rolename, rolesecret, webSocketPerMessageDeflateOptions); satoriConnection.connect(); - satoriConnection.setOnEventCallback( + satoriConnection.setEventCallback( [&satoriConnection, channel, path, &done] (ix::SatoriConnectionEventType eventType, const std::string& errMsg, diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index c03384c1..8d144e00 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -55,6 +55,7 @@ namespace ix { void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) { + std::lock_guard lock(_configMutex); _perMessageDeflateOptions = perMessageDeflateOptions; } diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 4b8406a0..164cf992 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -48,9 +48,10 @@ namespace ix WebSocketCloseInfo(uint64_t c = 0, const std::string& r = std::string()) + : code(c) + , reason(r) { - code = c; - reason = r; + ; } }; diff --git a/ixwebsocket/IXWebSocketPerMessageDeflate.cpp b/ixwebsocket/IXWebSocketPerMessageDeflate.cpp index 20eeb182..a123bd2d 100644 --- a/ixwebsocket/IXWebSocketPerMessageDeflate.cpp +++ b/ixwebsocket/IXWebSocketPerMessageDeflate.cpp @@ -69,6 +69,8 @@ namespace ix WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() : _compressBufferSize(kBufferSize) { + memset(&_deflateState, 0, sizeof(_deflateState)); + _deflateState.zalloc = Z_NULL; _deflateState.zfree = Z_NULL; _deflateState.opaque = Z_NULL; @@ -167,6 +169,8 @@ namespace ix WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() : _compressBufferSize(kBufferSize) { + memset(&_inflateState, 0, sizeof(_inflateState)); + _inflateState.zalloc = Z_NULL; _inflateState.zfree = Z_NULL; _inflateState.opaque = Z_NULL; diff --git a/ixwebsocket/IXWebSocketPerMessageDeflate.h b/ixwebsocket/IXWebSocketPerMessageDeflate.h index 6cfb0ad0..b8e91ecb 100644 --- a/ixwebsocket/IXWebSocketPerMessageDeflate.h +++ b/ixwebsocket/IXWebSocketPerMessageDeflate.h @@ -36,7 +36,6 @@ #include "zlib.h" #include -#include namespace ix { diff --git a/ixwebsocket/IXWebSocketSendInfo.h b/ixwebsocket/IXWebSocketSendInfo.h index dd9f4f3f..ef2bca42 100644 --- a/ixwebsocket/IXWebSocketSendInfo.h +++ b/ixwebsocket/IXWebSocketSendInfo.h @@ -20,11 +20,12 @@ namespace ix WebSocketSendInfo(bool s = false, bool c = false, size_t p = 0, size_t w = 0) + : success(s) + , compressionError(c) + , payloadSize(p) + , wireSize(w) { - success = s; - compressionError = c; - payloadSize = p; - wireSize = w; + ; } }; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 7cbacbd6..fba417a7 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -167,7 +167,8 @@ namespace ix if (!WebSocketTransport::parseUrl(_url, protocol, host, path, query, port)) { - return WebSocketInitResult(false, 0, "Could not parse URL"); + return WebSocketInitResult(false, 0, + std::string("Could not parse URL ") + _url); } if (protocol == "wss")