threading race condition fixes, detected by TSAN
This commit is contained in:
		| @@ -3,7 +3,7 @@ | |||||||
| ## Introduction | ## Introduction | ||||||
|  |  | ||||||
| [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex | [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex | ||||||
| communication channels over a single TCP connection. This library provides a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. | communication channels over a single TCP connection. *IXWebSocket* is a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. | ||||||
|  |  | ||||||
| * macOS | * macOS | ||||||
| * iOS | * iOS | ||||||
|   | |||||||
| @@ -21,11 +21,12 @@ namespace ix | |||||||
|  |  | ||||||
|     SatoriConnection::SatoriConnection() : |     SatoriConnection::SatoriConnection() : | ||||||
|         _authenticated(false), |         _authenticated(false), | ||||||
|         _eventCallback(nullptr) |         _eventCallback(nullptr), | ||||||
|  |         _publishMode(SatoriConnection_PublishMode_Immediate) | ||||||
|     { |     { | ||||||
|         _pdu["action"] = "rtm/publish"; |         _pdu["action"] = "rtm/publish"; | ||||||
|  |  | ||||||
|         resetWebSocketOnMessageCallback(); |         initWebSocketOnMessageCallback(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     SatoriConnection::~SatoriConnection() |     SatoriConnection::~SatoriConnection() | ||||||
| @@ -53,6 +54,7 @@ namespace ix | |||||||
|  |  | ||||||
|     void SatoriConnection::setEventCallback(const EventCallback& eventCallback) |     void SatoriConnection::setEventCallback(const EventCallback& eventCallback) | ||||||
|     { |     { | ||||||
|  |         std::lock_guard<std::mutex> lock(_eventCallbackMutex); | ||||||
|         _eventCallback = eventCallback; |         _eventCallback = eventCallback; | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -60,6 +62,7 @@ namespace ix | |||||||
|                                                const std::string& errorMsg, |                                                const std::string& errorMsg, | ||||||
|                                                const WebSocketHttpHeaders& headers) |                                                const WebSocketHttpHeaders& headers) | ||||||
|     { |     { | ||||||
|  |         std::lock_guard<std::mutex> lock(_eventCallbackMutex); | ||||||
|         if (_eventCallback) |         if (_eventCallback) | ||||||
|         { |         { | ||||||
|             _eventCallback(eventType, errorMsg, headers); |             _eventCallback(eventType, errorMsg, headers); | ||||||
| @@ -73,46 +76,12 @@ namespace ix | |||||||
|  |  | ||||||
|     void SatoriConnection::disconnect() |     void SatoriConnection::disconnect() | ||||||
|     { |     { | ||||||
|  |         _authenticated = false; | ||||||
|         _webSocket.stop(); |         _webSocket.stop(); | ||||||
|  |  | ||||||
|         resetWebSocketOnMessageCallback(); |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     void SatoriConnection::resetWebSocketOnMessageCallback() |     void SatoriConnection::initWebSocketOnMessageCallback() | ||||||
|     { |     { | ||||||
|         _webSocket.setOnMessageCallback( |  | ||||||
|             [](ix::WebSocketMessageType, |  | ||||||
|                    const std::string&, |  | ||||||
|                    size_t, |  | ||||||
|                    const ix::WebSocketErrorInfo&, |  | ||||||
|                    const ix::WebSocketCloseInfo&, |  | ||||||
|                    const ix::WebSocketHttpHeaders&) |  | ||||||
|             { |  | ||||||
|                 ; |  | ||||||
|             } |  | ||||||
|         ); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     void SatoriConnection::configure(const std::string& appkey, |  | ||||||
|                                      const std::string& endpoint, |  | ||||||
|                                      const std::string& rolename, |  | ||||||
|                                      const std::string& rolesecret, |  | ||||||
|                                      WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions) |  | ||||||
|     { |  | ||||||
|         _appkey = appkey; |  | ||||||
|         _endpoint = endpoint; |  | ||||||
|         _role_name = rolename; |  | ||||||
|         _role_secret = rolesecret; |  | ||||||
|  |  | ||||||
|         std::stringstream ss; |  | ||||||
|         ss << endpoint; |  | ||||||
|         ss << "/v2?appkey="; |  | ||||||
|         ss << appkey; |  | ||||||
|  |  | ||||||
|         std::string url = ss.str(); |  | ||||||
|         _webSocket.setUrl(url); |  | ||||||
|         _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); |  | ||||||
|  |  | ||||||
|         _webSocket.setOnMessageCallback( |         _webSocket.setOnMessageCallback( | ||||||
|             [this](ix::WebSocketMessageType messageType, |             [this](ix::WebSocketMessageType messageType, | ||||||
|                    const std::string& str, |                    const std::string& str, | ||||||
| @@ -201,6 +170,32 @@ namespace ix | |||||||
|         }); |         }); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     void SatoriConnection::setPublishMode(SatoriConnectionPublishMode publishMode) | ||||||
|  |     { | ||||||
|  |         _publishMode = publishMode; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     void SatoriConnection::configure(const std::string& appkey, | ||||||
|  |                                      const std::string& endpoint, | ||||||
|  |                                      const std::string& rolename, | ||||||
|  |                                      const std::string& rolesecret, | ||||||
|  |                                      WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions) | ||||||
|  |     { | ||||||
|  |         _appkey = appkey; | ||||||
|  |         _endpoint = endpoint; | ||||||
|  |         _role_name = rolename; | ||||||
|  |         _role_secret = rolesecret; | ||||||
|  |  | ||||||
|  |         std::stringstream ss; | ||||||
|  |         ss << _endpoint; | ||||||
|  |         ss << "/v2?appkey="; | ||||||
|  |         ss << _appkey; | ||||||
|  |  | ||||||
|  |         std::string url = ss.str(); | ||||||
|  |         _webSocket.setUrl(url); | ||||||
|  |         _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     // |     // | ||||||
|     // Handshake message schema. |     // Handshake message schema. | ||||||
|     // |     // | ||||||
| @@ -228,7 +223,7 @@ namespace ix | |||||||
|         pdu["action"] = "auth/handshake"; |         pdu["action"] = "auth/handshake"; | ||||||
|         pdu["body"] = body; |         pdu["body"] = body; | ||||||
|  |  | ||||||
|         std::string serializedJson = _jsonWriter.write(pdu); |         std::string serializedJson = serializeJson(pdu); | ||||||
|         SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); |         SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); | ||||||
|  |  | ||||||
|         return _webSocket.send(serializedJson).success; |         return _webSocket.send(serializedJson).success; | ||||||
| @@ -290,7 +285,7 @@ namespace ix | |||||||
|         pdu["action"] = "auth/authenticate"; |         pdu["action"] = "auth/authenticate"; | ||||||
|         pdu["body"] = body; |         pdu["body"] = body; | ||||||
|  |  | ||||||
|         std::string serializedJson = _jsonWriter.write(pdu); |         std::string serializedJson = serializeJson(pdu); | ||||||
|         SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); |         SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); | ||||||
|  |  | ||||||
|         return _webSocket.send(serializedJson).success; |         return _webSocket.send(serializedJson).success; | ||||||
| @@ -307,6 +302,7 @@ namespace ix | |||||||
|         if (!body.isMember("subscription_id")) return false; |         if (!body.isMember("subscription_id")) return false; | ||||||
|         Json::Value subscriptionId = body["subscription_id"]; |         Json::Value subscriptionId = body["subscription_id"]; | ||||||
|  |  | ||||||
|  |         std::lock_guard<std::mutex> lock(_cbsMutex); | ||||||
|         auto cb = _cbs.find(subscriptionId.asString()); |         auto cb = _cbs.find(subscriptionId.asString()); | ||||||
|         if (cb == _cbs.end()) return false; // cannot find callback |         if (cb == _cbs.end()) return false; // cannot find callback | ||||||
|  |  | ||||||
| @@ -333,17 +329,29 @@ namespace ix | |||||||
|         return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open; |         return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     std::string SatoriConnection::serializeJson(const Json::Value& value) | ||||||
|  |     { | ||||||
|  |         std::lock_guard<std::mutex> lock(_jsonWriterMutex); | ||||||
|  |         return _jsonWriter.write(value); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     // |     // | ||||||
|     // publish is not thread safe as we are trying to reuse some Json objects. |     // publish is not thread safe as we are trying to reuse some Json objects. | ||||||
|     // |     // | ||||||
|     bool SatoriConnection::publish(const std::string& channel, |     bool SatoriConnection::publish(const Json::Value& channels, | ||||||
|                                    const Json::Value& msg) |                                    const Json::Value& msg) | ||||||
|     { |     { | ||||||
|         _body["channel"] = channel; |         _body["channels"] = channels; | ||||||
|         _body["message"] = msg; |         _body["message"] = msg; | ||||||
|         _pdu["body"] = _body; |         _pdu["body"] = _body; | ||||||
|  |  | ||||||
|         std::string serializedJson = _jsonWriter.write(_pdu); |         std::string serializedJson = serializeJson(_pdu); | ||||||
|  |  | ||||||
|  |         if (_publishMode == SatoriConnection_PublishMode_Batch) | ||||||
|  |         { | ||||||
|  |             enqueue(serializedJson); | ||||||
|  |             return true; | ||||||
|  |         } | ||||||
|  |  | ||||||
|         // |         // | ||||||
|         // Fast path. We are authenticated and the publishing succeed |         // Fast path. We are authenticated and the publishing succeed | ||||||
| @@ -454,4 +462,14 @@ namespace ix | |||||||
|         return webSocketSendInfo.success; |         return webSocketSendInfo.success; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     void SatoriConnection::suspend() | ||||||
|  |     { | ||||||
|  |         disconnect(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     void SatoriConnection::resume() | ||||||
|  |     { | ||||||
|  |         connect(); | ||||||
|  |     } | ||||||
|  |      | ||||||
| } // namespace ix | } // namespace ix | ||||||
|   | |||||||
| @@ -26,6 +26,12 @@ namespace ix | |||||||
|         SatoriConnection_EventType_Closed = 3 |         SatoriConnection_EventType_Closed = 3 | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|  |     enum SatoriConnectionPublishMode | ||||||
|  |     { | ||||||
|  |         SatoriConnection_PublishMode_Immediate = 0, | ||||||
|  |         SatoriConnection_PublishMode_Batch = 1 | ||||||
|  |     }; | ||||||
|  |  | ||||||
|     using SubscriptionCallback = std::function<void(const Json::Value&)>; |     using SubscriptionCallback = std::function<void(const Json::Value&)>; | ||||||
|     using EventCallback = std::function<void(SatoriConnectionEventType, |     using EventCallback = std::function<void(SatoriConnectionEventType, | ||||||
|                                              const std::string&, |                                              const std::string&, | ||||||
| @@ -46,7 +52,6 @@ namespace ix | |||||||
|                        const std::string& rolesecret, |                        const std::string& rolesecret, | ||||||
|                        WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions); |                        WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions); | ||||||
|  |  | ||||||
|         /// Set the traffic tracker callback |  | ||||||
|         static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); |         static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); | ||||||
|  |  | ||||||
|         /// Reset the traffic tracker callback to an no-op one. |         /// Reset the traffic tracker callback to an no-op one. | ||||||
| @@ -61,7 +66,7 @@ namespace ix | |||||||
|         /// Publish a message to a channel |         /// Publish a message to a channel | ||||||
|         /// |         /// | ||||||
|         /// No-op if the connection is not established |         /// No-op if the connection is not established | ||||||
|         bool publish(const std::string& channel, |         bool publish(const Json::Value& channels, | ||||||
|                      const Json::Value& msg); |                      const Json::Value& msg); | ||||||
|  |  | ||||||
|         // Subscribe to a channel, and execute a callback when an incoming |         // Subscribe to a channel, and execute a callback when an incoming | ||||||
| @@ -71,7 +76,7 @@ namespace ix | |||||||
|         /// Unsubscribe from a channel |         /// Unsubscribe from a channel | ||||||
|         void unsubscribe(const std::string& channel); |         void unsubscribe(const std::string& channel); | ||||||
|  |  | ||||||
|         /// Close the RTM connection and free the RTM handle memory |         /// Close the connection | ||||||
|         void disconnect(); |         void disconnect(); | ||||||
|  |  | ||||||
|         /// Connect to Satori and authenticate the connection |         /// Connect to Satori and authenticate the connection | ||||||
| @@ -80,17 +85,27 @@ namespace ix | |||||||
|         /// Returns true only if we're connected |         /// Returns true only if we're connected | ||||||
|         bool isConnected() const; |         bool isConnected() const; | ||||||
|          |          | ||||||
|  |         /// Flush the publish queue | ||||||
|  |         bool flushQueue(); | ||||||
|  |  | ||||||
|  |         /// Set the publish mode | ||||||
|  |         void setPublishMode(SatoriConnectionPublishMode publishMode); | ||||||
|  |  | ||||||
|  |         /// Lifecycle management. Free resources when backgrounding | ||||||
|  |         void suspend(); | ||||||
|  |         void resume(); | ||||||
|  |  | ||||||
|     private: |     private: | ||||||
|         bool sendHandshakeMessage(); |         bool sendHandshakeMessage(); | ||||||
|         bool handleHandshakeResponse(const Json::Value& data); |         bool handleHandshakeResponse(const Json::Value& data); | ||||||
|         bool sendAuthMessage(const std::string& nonce); |         bool sendAuthMessage(const std::string& nonce); | ||||||
|         bool handleSubscriptionData(const Json::Value& pdu); |         bool handleSubscriptionData(const Json::Value& pdu); | ||||||
|  |  | ||||||
|         void resetWebSocketOnMessageCallback(); |         void initWebSocketOnMessageCallback(); | ||||||
|  |  | ||||||
|         bool publishMessage(const std::string& serializedJson); |         bool publishMessage(const std::string& serializedJson); | ||||||
|         bool flushQueue(); |  | ||||||
|         void enqueue(const std::string& msg); |         void enqueue(const std::string& msg); | ||||||
|  |         std::string serializeJson(const Json::Value& pdu); | ||||||
|  |  | ||||||
|         /// Invoke the traffic tracker callback |         /// Invoke the traffic tracker callback | ||||||
|         static void invokeTrafficTrackerCallback(size_t size, bool incoming); |         static void invokeTrafficTrackerCallback(size_t size, bool incoming); | ||||||
| @@ -111,6 +126,7 @@ namespace ix | |||||||
|         std::string _endpoint; |         std::string _endpoint; | ||||||
|         std::string _role_name; |         std::string _role_name; | ||||||
|         std::string _role_secret; |         std::string _role_secret; | ||||||
|  |         std::atomic<SatoriConnectionPublishMode> _publishMode; | ||||||
|  |  | ||||||
|         // Can be set on control+background thread, protecting with an atomic |         // Can be set on control+background thread, protecting with an atomic | ||||||
|         std::atomic<bool> _authenticated; |         std::atomic<bool> _authenticated; | ||||||
| @@ -119,12 +135,14 @@ namespace ix | |||||||
|         Json::Value _body; |         Json::Value _body; | ||||||
|         Json::Value _pdu; |         Json::Value _pdu; | ||||||
|         Json::FastWriter _jsonWriter; |         Json::FastWriter _jsonWriter; | ||||||
|  |         mutable std::mutex _jsonWriterMutex; | ||||||
|  |  | ||||||
|         /// Traffic tracker callback |         /// Traffic tracker callback | ||||||
|         static TrafficTrackerCallback _trafficTrackerCallback; |         static TrafficTrackerCallback _trafficTrackerCallback; | ||||||
|  |  | ||||||
|         /// Satori events callbacks |         /// Satori events callbacks | ||||||
|         EventCallback _eventCallback; |         EventCallback _eventCallback; | ||||||
|  |         mutable std::mutex _eventCallbackMutex; | ||||||
|  |  | ||||||
|         /// Subscription callbacks, only one per channel |         /// Subscription callbacks, only one per channel | ||||||
|         std::unordered_map<std::string, SubscriptionCallback> _cbs; |         std::unordered_map<std::string, SubscriptionCallback> _cbs; | ||||||
| @@ -139,7 +157,7 @@ namespace ix | |||||||
|         mutable std::mutex _queueMutex; |         mutable std::mutex _queueMutex; | ||||||
|  |  | ||||||
|         // Cap the queue size (100 elems so far -> ~100k) |         // Cap the queue size (100 elems so far -> ~100k) | ||||||
|         static constexpr size_t kQueueMaxSize = 100; |         static constexpr size_t kQueueMaxSize = 256; | ||||||
|     }; |     }; | ||||||
|      |      | ||||||
| } // namespace ix | } // namespace ix | ||||||
|   | |||||||
| @@ -55,7 +55,7 @@ int main(int argc, char* argv[]) | |||||||
|     satoriConnection.configure(appkey, endpoint, rolename, rolesecret, |     satoriConnection.configure(appkey, endpoint, rolename, rolesecret, | ||||||
|                                webSocketPerMessageDeflateOptions); |                                webSocketPerMessageDeflateOptions); | ||||||
|     satoriConnection.connect(); |     satoriConnection.connect(); | ||||||
|     satoriConnection.setOnEventCallback( |     satoriConnection.setEventCallback( | ||||||
|         [&satoriConnection, channel, path, &done] |         [&satoriConnection, channel, path, &done] | ||||||
|         (ix::SatoriConnectionEventType eventType, |         (ix::SatoriConnectionEventType eventType, | ||||||
|          const std::string& errMsg, |          const std::string& errMsg, | ||||||
|   | |||||||
| @@ -55,6 +55,7 @@ namespace ix { | |||||||
|  |  | ||||||
|     void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) |     void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) | ||||||
|     { |     { | ||||||
|  |         std::lock_guard<std::mutex> lock(_configMutex); | ||||||
|         _perMessageDeflateOptions = perMessageDeflateOptions; |         _perMessageDeflateOptions = perMessageDeflateOptions; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -48,9 +48,10 @@ namespace ix | |||||||
|  |  | ||||||
|         WebSocketCloseInfo(uint64_t c = 0, |         WebSocketCloseInfo(uint64_t c = 0, | ||||||
|                            const std::string& r = std::string()) |                            const std::string& r = std::string()) | ||||||
|  |             : code(c) | ||||||
|  |             , reason(r) | ||||||
|         { |         { | ||||||
|             code = c; |             ; | ||||||
|             reason = r; |  | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -69,6 +69,8 @@ namespace ix | |||||||
|     WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() |     WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() | ||||||
|       : _compressBufferSize(kBufferSize) |       : _compressBufferSize(kBufferSize) | ||||||
|     { |     { | ||||||
|  |         memset(&_deflateState, 0, sizeof(_deflateState)); | ||||||
|  |  | ||||||
|         _deflateState.zalloc = Z_NULL; |         _deflateState.zalloc = Z_NULL; | ||||||
|         _deflateState.zfree = Z_NULL; |         _deflateState.zfree = Z_NULL; | ||||||
|         _deflateState.opaque = Z_NULL; |         _deflateState.opaque = Z_NULL; | ||||||
| @@ -167,6 +169,8 @@ namespace ix | |||||||
|     WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() |     WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() | ||||||
|       : _compressBufferSize(kBufferSize) |       : _compressBufferSize(kBufferSize) | ||||||
|     { |     { | ||||||
|  |         memset(&_inflateState, 0, sizeof(_inflateState)); | ||||||
|  |  | ||||||
|         _inflateState.zalloc = Z_NULL; |         _inflateState.zalloc = Z_NULL; | ||||||
|         _inflateState.zfree = Z_NULL; |         _inflateState.zfree = Z_NULL; | ||||||
|         _inflateState.opaque = Z_NULL; |         _inflateState.opaque = Z_NULL; | ||||||
|   | |||||||
| @@ -36,7 +36,6 @@ | |||||||
|  |  | ||||||
| #include "zlib.h" | #include "zlib.h" | ||||||
| #include <string> | #include <string> | ||||||
| #include <memory> |  | ||||||
|  |  | ||||||
| namespace ix  | namespace ix  | ||||||
| { | { | ||||||
|   | |||||||
| @@ -20,11 +20,12 @@ namespace ix | |||||||
|  |  | ||||||
|         WebSocketSendInfo(bool s = false, bool c = false, |         WebSocketSendInfo(bool s = false, bool c = false, | ||||||
|                           size_t p = 0, size_t w = 0) |                           size_t p = 0, size_t w = 0) | ||||||
|  |             : success(s) | ||||||
|  |             , compressionError(c) | ||||||
|  |             , payloadSize(p) | ||||||
|  |             , wireSize(w) | ||||||
|         { |         { | ||||||
|             success = s; |             ; | ||||||
|             compressionError = c; |  | ||||||
|             payloadSize = p; |  | ||||||
|             wireSize = w; |  | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
| } | } | ||||||
|   | |||||||
| @@ -167,7 +167,8 @@ namespace ix | |||||||
|         if (!WebSocketTransport::parseUrl(_url, protocol, host, |         if (!WebSocketTransport::parseUrl(_url, protocol, host, | ||||||
|                                           path, query, port)) |                                           path, query, port)) | ||||||
|         { |         { | ||||||
|             return WebSocketInitResult(false, 0, "Could not parse URL"); |             return WebSocketInitResult(false, 0, | ||||||
|  |                                        std::string("Could not parse URL ") + _url); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if (protocol == "wss") |         if (protocol == "wss") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user