cleanup
This commit is contained in:
@ -13,32 +13,19 @@
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
|
||||
namespace
|
||||
{
|
||||
bool parseJson(const std::string& str, Json::Value& value)
|
||||
{
|
||||
Json::Reader reader;
|
||||
return reader.parse(str, value);
|
||||
}
|
||||
|
||||
std::string writeJsonCompact(const Json::Value& value)
|
||||
{
|
||||
Json::FastWriter writer;
|
||||
return writer.write(value);
|
||||
}
|
||||
}
|
||||
|
||||
namespace ix
|
||||
{
|
||||
OnTrafficTrackerCallback SatoriConnection::_onTrafficTrackerCallback = nullptr;
|
||||
TrafficTrackerCallback SatoriConnection::_trafficTrackerCallback = nullptr;
|
||||
constexpr size_t SatoriConnection::kQueueMaxSize;
|
||||
|
||||
SatoriConnection::SatoriConnection() :
|
||||
_authenticated(false),
|
||||
_onEventCallback(nullptr)
|
||||
_eventCallback(nullptr)
|
||||
{
|
||||
_pdu["action"] = "rtm/publish";
|
||||
resetOnMessageCallback();
|
||||
|
||||
resetWebSocketOnMessageCallback();
|
||||
}
|
||||
|
||||
SatoriConnection::~SatoriConnection()
|
||||
@ -46,9 +33,9 @@ namespace ix
|
||||
disconnect();
|
||||
}
|
||||
|
||||
void SatoriConnection::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
|
||||
void SatoriConnection::setTrafficTrackerCallback(const TrafficTrackerCallback& callback)
|
||||
{
|
||||
_onTrafficTrackerCallback = callback;
|
||||
_trafficTrackerCallback = callback;
|
||||
}
|
||||
|
||||
void SatoriConnection::resetTrafficTrackerCallback()
|
||||
@ -58,40 +45,40 @@ namespace ix
|
||||
|
||||
void SatoriConnection::invokeTrafficTrackerCallback(size_t size, bool incoming)
|
||||
{
|
||||
if (_onTrafficTrackerCallback)
|
||||
if (_trafficTrackerCallback)
|
||||
{
|
||||
_onTrafficTrackerCallback(size, incoming);
|
||||
_trafficTrackerCallback(size, incoming);
|
||||
}
|
||||
}
|
||||
|
||||
void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback)
|
||||
void SatoriConnection::setEventCallback(const EventCallback& eventCallback)
|
||||
{
|
||||
_onEventCallback = onEventCallback;
|
||||
_eventCallback = eventCallback;
|
||||
}
|
||||
|
||||
void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg,
|
||||
const WebSocketHttpHeaders& headers)
|
||||
void SatoriConnection::invokeEventCallback(ix::SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg,
|
||||
const WebSocketHttpHeaders& headers)
|
||||
{
|
||||
if (_onEventCallback)
|
||||
if (_eventCallback)
|
||||
{
|
||||
_onEventCallback(eventType, errorMsg, headers);
|
||||
_eventCallback(eventType, errorMsg, headers);
|
||||
}
|
||||
}
|
||||
|
||||
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
|
||||
{
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
|
||||
invokeEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
|
||||
}
|
||||
|
||||
void SatoriConnection::disconnect()
|
||||
{
|
||||
_webSocket.stop();
|
||||
|
||||
resetOnMessageCallback();
|
||||
resetWebSocketOnMessageCallback();
|
||||
}
|
||||
|
||||
void SatoriConnection::resetOnMessageCallback()
|
||||
void SatoriConnection::resetWebSocketOnMessageCallback()
|
||||
{
|
||||
_webSocket.setOnMessageCallback(
|
||||
[](ix::WebSocketMessageType,
|
||||
@ -139,9 +126,9 @@ namespace ix
|
||||
std::stringstream ss;
|
||||
if (messageType == ix::WebSocket_MessageType_Open)
|
||||
{
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Open,
|
||||
std::string(),
|
||||
headers);
|
||||
invokeEventCallback(ix::SatoriConnection_EventType_Open,
|
||||
std::string(),
|
||||
headers);
|
||||
sendHandshakeMessage();
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Close)
|
||||
@ -151,13 +138,14 @@ namespace ix
|
||||
std::stringstream ss;
|
||||
ss << "Close code " << closeInfo.code;
|
||||
ss << " reason " << closeInfo.reason;
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Closed,
|
||||
ss.str());
|
||||
invokeEventCallback(ix::SatoriConnection_EventType_Closed,
|
||||
ss.str());
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Message)
|
||||
{
|
||||
Json::Value data;
|
||||
if (!parseJson(str, data))
|
||||
Json::Reader reader;
|
||||
if (!reader.parse(str, data))
|
||||
{
|
||||
invokeErrorCallback(std::string("Invalid json: ") + str);
|
||||
return;
|
||||
@ -185,7 +173,7 @@ namespace ix
|
||||
else if (action == "auth/authenticate/ok")
|
||||
{
|
||||
_authenticated = true;
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated);
|
||||
invokeEventCallback(ix::SatoriConnection_EventType_Authenticated);
|
||||
flushQueue();
|
||||
}
|
||||
else if (action == "auth/authenticate/error")
|
||||
@ -240,7 +228,7 @@ namespace ix
|
||||
pdu["action"] = "auth/handshake";
|
||||
pdu["body"] = body;
|
||||
|
||||
std::string serializedJson = writeJsonCompact(pdu);
|
||||
std::string serializedJson = _jsonWriter.write(pdu);
|
||||
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
|
||||
|
||||
return _webSocket.send(serializedJson).success;
|
||||
@ -302,7 +290,7 @@ namespace ix
|
||||
pdu["action"] = "auth/authenticate";
|
||||
pdu["body"] = body;
|
||||
|
||||
std::string serializedJson = writeJsonCompact(pdu);
|
||||
std::string serializedJson = _jsonWriter.write(pdu);
|
||||
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
|
||||
|
||||
return _webSocket.send(serializedJson).success;
|
||||
@ -355,7 +343,7 @@ namespace ix
|
||||
_body["message"] = msg;
|
||||
_pdu["body"] = _body;
|
||||
|
||||
std::string serializedJson = writeJsonCompact(_pdu);
|
||||
std::string serializedJson = _jsonWriter.write(_pdu);
|
||||
|
||||
//
|
||||
// Fast path. We are authenticated and the publishing succeed
|
||||
|
@ -27,10 +27,10 @@ namespace ix
|
||||
};
|
||||
|
||||
using SubscriptionCallback = std::function<void(const Json::Value&)>;
|
||||
using OnEventCallback = std::function<void(SatoriConnectionEventType,
|
||||
const std::string&,
|
||||
const WebSocketHttpHeaders&)>;
|
||||
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
|
||||
using EventCallback = std::function<void(SatoriConnectionEventType,
|
||||
const std::string&,
|
||||
const WebSocketHttpHeaders&)>;
|
||||
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
|
||||
|
||||
class SatoriConnection
|
||||
{
|
||||
@ -47,13 +47,13 @@ namespace ix
|
||||
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions);
|
||||
|
||||
/// Set the traffic tracker callback
|
||||
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
|
||||
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
|
||||
|
||||
/// Reset the traffic tracker callback to an no-op one.
|
||||
static void resetTrafficTrackerCallback();
|
||||
|
||||
/// Set the closed callback
|
||||
void setOnEventCallback(const OnEventCallback& onEventCallback);
|
||||
void setEventCallback(const EventCallback& eventCallback);
|
||||
|
||||
/// Start the worker thread, used for background publishing
|
||||
void start();
|
||||
@ -86,7 +86,7 @@ namespace ix
|
||||
bool sendAuthMessage(const std::string& nonce);
|
||||
bool handleSubscriptionData(const Json::Value& pdu);
|
||||
|
||||
void resetOnMessageCallback();
|
||||
void resetWebSocketOnMessageCallback();
|
||||
|
||||
bool publishMessage(const std::string& serializedJson);
|
||||
bool flushQueue();
|
||||
@ -96,9 +96,9 @@ namespace ix
|
||||
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
|
||||
|
||||
/// Invoke event callbacks
|
||||
void invokeOnEventCallback(SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg = std::string(),
|
||||
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
|
||||
void invokeEventCallback(SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg = std::string(),
|
||||
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
|
||||
void invokeErrorCallback(const std::string& errorMsg);
|
||||
|
||||
///
|
||||
@ -118,12 +118,13 @@ namespace ix
|
||||
// Keep some objects around
|
||||
Json::Value _body;
|
||||
Json::Value _pdu;
|
||||
Json::FastWriter _jsonWriter;
|
||||
|
||||
/// Traffic tracker callback
|
||||
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
|
||||
static TrafficTrackerCallback _trafficTrackerCallback;
|
||||
|
||||
/// Callbacks
|
||||
OnEventCallback _onEventCallback;
|
||||
/// Satori events callbacks
|
||||
EventCallback _eventCallback;
|
||||
|
||||
/// Subscription callbacks, only one per channel
|
||||
std::unordered_map<std::string, SubscriptionCallback> _cbs;
|
||||
|
Reference in New Issue
Block a user