threading race condition fixes, detected by TSAN

This commit is contained in:
Benjamin Sergeant 2018-12-06 08:27:28 -08:00
parent c64bc20bb5
commit 49bf8bd830
10 changed files with 102 additions and 59 deletions

View File

@ -3,7 +3,7 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. This library provides a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS

View File

@ -21,11 +21,12 @@ namespace ix
SatoriConnection::SatoriConnection() :
_authenticated(false),
_eventCallback(nullptr)
_eventCallback(nullptr),
_publishMode(SatoriConnection_PublishMode_Immediate)
{
_pdu["action"] = "rtm/publish";
resetWebSocketOnMessageCallback();
initWebSocketOnMessageCallback();
}
SatoriConnection::~SatoriConnection()
@ -53,6 +54,7 @@ namespace ix
void SatoriConnection::setEventCallback(const EventCallback& eventCallback)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
_eventCallback = eventCallback;
}
@ -60,6 +62,7 @@ namespace ix
const std::string& errorMsg,
const WebSocketHttpHeaders& headers)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
if (_eventCallback)
{
_eventCallback(eventType, errorMsg, headers);
@ -73,46 +76,12 @@ namespace ix
void SatoriConnection::disconnect()
{
_authenticated = false;
_webSocket.stop();
resetWebSocketOnMessageCallback();
}
void SatoriConnection::resetWebSocketOnMessageCallback()
void SatoriConnection::initWebSocketOnMessageCallback()
{
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType,
const std::string&,
size_t,
const ix::WebSocketErrorInfo&,
const ix::WebSocketCloseInfo&,
const ix::WebSocketHttpHeaders&)
{
;
}
);
}
void SatoriConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
{
_appkey = appkey;
_endpoint = endpoint;
_role_name = rolename;
_role_secret = rolesecret;
std::stringstream ss;
ss << endpoint;
ss << "/v2?appkey=";
ss << appkey;
std::string url = ss.str();
_webSocket.setUrl(url);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
@ -201,6 +170,32 @@ namespace ix
});
}
void SatoriConnection::setPublishMode(SatoriConnectionPublishMode publishMode)
{
_publishMode = publishMode;
}
void SatoriConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
{
_appkey = appkey;
_endpoint = endpoint;
_role_name = rolename;
_role_secret = rolesecret;
std::stringstream ss;
ss << _endpoint;
ss << "/v2?appkey=";
ss << _appkey;
std::string url = ss.str();
_webSocket.setUrl(url);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
//
// Handshake message schema.
//
@ -228,7 +223,7 @@ namespace ix
pdu["action"] = "auth/handshake";
pdu["body"] = body;
std::string serializedJson = _jsonWriter.write(pdu);
std::string serializedJson = serializeJson(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success;
@ -290,7 +285,7 @@ namespace ix
pdu["action"] = "auth/authenticate";
pdu["body"] = body;
std::string serializedJson = _jsonWriter.write(pdu);
std::string serializedJson = serializeJson(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success;
@ -307,6 +302,7 @@ namespace ix
if (!body.isMember("subscription_id")) return false;
Json::Value subscriptionId = body["subscription_id"];
std::lock_guard<std::mutex> lock(_cbsMutex);
auto cb = _cbs.find(subscriptionId.asString());
if (cb == _cbs.end()) return false; // cannot find callback
@ -333,17 +329,29 @@ namespace ix
return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open;
}
std::string SatoriConnection::serializeJson(const Json::Value& value)
{
std::lock_guard<std::mutex> lock(_jsonWriterMutex);
return _jsonWriter.write(value);
}
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
bool SatoriConnection::publish(const std::string& channel,
bool SatoriConnection::publish(const Json::Value& channels,
const Json::Value& msg)
{
_body["channel"] = channel;
_body["channels"] = channels;
_body["message"] = msg;
_pdu["body"] = _body;
std::string serializedJson = _jsonWriter.write(_pdu);
std::string serializedJson = serializeJson(_pdu);
if (_publishMode == SatoriConnection_PublishMode_Batch)
{
enqueue(serializedJson);
return true;
}
//
// Fast path. We are authenticated and the publishing succeed
@ -453,5 +461,15 @@ namespace ix
false);
return webSocketSendInfo.success;
}
void SatoriConnection::suspend()
{
disconnect();
}
void SatoriConnection::resume()
{
connect();
}
} // namespace ix

View File

@ -26,6 +26,12 @@ namespace ix
SatoriConnection_EventType_Closed = 3
};
enum SatoriConnectionPublishMode
{
SatoriConnection_PublishMode_Immediate = 0,
SatoriConnection_PublishMode_Batch = 1
};
using SubscriptionCallback = std::function<void(const Json::Value&)>;
using EventCallback = std::function<void(SatoriConnectionEventType,
const std::string&,
@ -46,7 +52,6 @@ namespace ix
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
/// Reset the traffic tracker callback to an no-op one.
@ -61,7 +66,7 @@ namespace ix
/// Publish a message to a channel
///
/// No-op if the connection is not established
bool publish(const std::string& channel,
bool publish(const Json::Value& channels,
const Json::Value& msg);
// Subscribe to a channel, and execute a callback when an incoming
@ -71,7 +76,7 @@ namespace ix
/// Unsubscribe from a channel
void unsubscribe(const std::string& channel);
/// Close the RTM connection and free the RTM handle memory
/// Close the connection
void disconnect();
/// Connect to Satori and authenticate the connection
@ -80,17 +85,27 @@ namespace ix
/// Returns true only if we're connected
bool isConnected() const;
/// Flush the publish queue
bool flushQueue();
/// Set the publish mode
void setPublishMode(SatoriConnectionPublishMode publishMode);
/// Lifecycle management. Free resources when backgrounding
void suspend();
void resume();
private:
bool sendHandshakeMessage();
bool handleHandshakeResponse(const Json::Value& data);
bool sendAuthMessage(const std::string& nonce);
bool handleSubscriptionData(const Json::Value& pdu);
void resetWebSocketOnMessageCallback();
void initWebSocketOnMessageCallback();
bool publishMessage(const std::string& serializedJson);
bool flushQueue();
void enqueue(const std::string& msg);
std::string serializeJson(const Json::Value& pdu);
/// Invoke the traffic tracker callback
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
@ -111,6 +126,7 @@ namespace ix
std::string _endpoint;
std::string _role_name;
std::string _role_secret;
std::atomic<SatoriConnectionPublishMode> _publishMode;
// Can be set on control+background thread, protecting with an atomic
std::atomic<bool> _authenticated;
@ -119,12 +135,14 @@ namespace ix
Json::Value _body;
Json::Value _pdu;
Json::FastWriter _jsonWriter;
mutable std::mutex _jsonWriterMutex;
/// Traffic tracker callback
static TrafficTrackerCallback _trafficTrackerCallback;
/// Satori events callbacks
EventCallback _eventCallback;
mutable std::mutex _eventCallbackMutex;
/// Subscription callbacks, only one per channel
std::unordered_map<std::string, SubscriptionCallback> _cbs;
@ -139,7 +157,7 @@ namespace ix
mutable std::mutex _queueMutex;
// Cap the queue size (100 elems so far -> ~100k)
static constexpr size_t kQueueMaxSize = 100;
static constexpr size_t kQueueMaxSize = 256;
};
} // namespace ix

View File

@ -55,7 +55,7 @@ int main(int argc, char* argv[])
satoriConnection.configure(appkey, endpoint, rolename, rolesecret,
webSocketPerMessageDeflateOptions);
satoriConnection.connect();
satoriConnection.setOnEventCallback(
satoriConnection.setEventCallback(
[&satoriConnection, channel, path, &done]
(ix::SatoriConnectionEventType eventType,
const std::string& errMsg,

View File

@ -55,6 +55,7 @@ namespace ix {
void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
{
std::lock_guard<std::mutex> lock(_configMutex);
_perMessageDeflateOptions = perMessageDeflateOptions;
}

View File

@ -48,9 +48,10 @@ namespace ix
WebSocketCloseInfo(uint64_t c = 0,
const std::string& r = std::string())
: code(c)
, reason(r)
{
code = c;
reason = r;
;
}
};

View File

@ -69,6 +69,8 @@ namespace ix
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize)
{
memset(&_deflateState, 0, sizeof(_deflateState));
_deflateState.zalloc = Z_NULL;
_deflateState.zfree = Z_NULL;
_deflateState.opaque = Z_NULL;
@ -167,6 +169,8 @@ namespace ix
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize)
{
memset(&_inflateState, 0, sizeof(_inflateState));
_inflateState.zalloc = Z_NULL;
_inflateState.zfree = Z_NULL;
_inflateState.opaque = Z_NULL;

View File

@ -36,7 +36,6 @@
#include "zlib.h"
#include <string>
#include <memory>
namespace ix
{

View File

@ -20,11 +20,12 @@ namespace ix
WebSocketSendInfo(bool s = false, bool c = false,
size_t p = 0, size_t w = 0)
: success(s)
, compressionError(c)
, payloadSize(p)
, wireSize(w)
{
success = s;
compressionError = c;
payloadSize = p;
wireSize = w;
;
}
};
}

View File

@ -167,7 +167,8 @@ namespace ix
if (!WebSocketTransport::parseUrl(_url, protocol, host,
path, query, port))
{
return WebSocketInitResult(false, 0, "Could not parse URL");
return WebSocketInitResult(false, 0,
std::string("Could not parse URL ") + _url);
}
if (protocol == "wss")