This commit is contained in:
Benjamin Sergeant 2018-11-14 15:52:28 -08:00
parent 5710ffba6a
commit 58cccbdcf9
9 changed files with 92 additions and 97 deletions

View File

@ -13,32 +13,19 @@
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
namespace
{
bool parseJson(const std::string& str, Json::Value& value)
{
Json::Reader reader;
return reader.parse(str, value);
}
std::string writeJsonCompact(const Json::Value& value)
{
Json::FastWriter writer;
return writer.write(value);
}
}
namespace ix namespace ix
{ {
OnTrafficTrackerCallback SatoriConnection::_onTrafficTrackerCallback = nullptr; TrafficTrackerCallback SatoriConnection::_trafficTrackerCallback = nullptr;
constexpr size_t SatoriConnection::kQueueMaxSize; constexpr size_t SatoriConnection::kQueueMaxSize;
SatoriConnection::SatoriConnection() : SatoriConnection::SatoriConnection() :
_authenticated(false), _authenticated(false),
_onEventCallback(nullptr) _eventCallback(nullptr)
{ {
_pdu["action"] = "rtm/publish"; _pdu["action"] = "rtm/publish";
resetOnMessageCallback();
resetWebSocketOnMessageCallback();
} }
SatoriConnection::~SatoriConnection() SatoriConnection::~SatoriConnection()
@ -46,9 +33,9 @@ namespace ix
disconnect(); disconnect();
} }
void SatoriConnection::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback) void SatoriConnection::setTrafficTrackerCallback(const TrafficTrackerCallback& callback)
{ {
_onTrafficTrackerCallback = callback; _trafficTrackerCallback = callback;
} }
void SatoriConnection::resetTrafficTrackerCallback() void SatoriConnection::resetTrafficTrackerCallback()
@ -58,40 +45,40 @@ namespace ix
void SatoriConnection::invokeTrafficTrackerCallback(size_t size, bool incoming) void SatoriConnection::invokeTrafficTrackerCallback(size_t size, bool incoming)
{ {
if (_onTrafficTrackerCallback) if (_trafficTrackerCallback)
{ {
_onTrafficTrackerCallback(size, incoming); _trafficTrackerCallback(size, incoming);
} }
} }
void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback) void SatoriConnection::setEventCallback(const EventCallback& eventCallback)
{ {
_onEventCallback = onEventCallback; _eventCallback = eventCallback;
} }
void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType, void SatoriConnection::invokeEventCallback(ix::SatoriConnectionEventType eventType,
const std::string& errorMsg, const std::string& errorMsg,
const WebSocketHttpHeaders& headers) const WebSocketHttpHeaders& headers)
{ {
if (_onEventCallback) if (_eventCallback)
{ {
_onEventCallback(eventType, errorMsg, headers); _eventCallback(eventType, errorMsg, headers);
} }
} }
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg) void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
{ {
invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg); invokeEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
} }
void SatoriConnection::disconnect() void SatoriConnection::disconnect()
{ {
_webSocket.stop(); _webSocket.stop();
resetOnMessageCallback(); resetWebSocketOnMessageCallback();
} }
void SatoriConnection::resetOnMessageCallback() void SatoriConnection::resetWebSocketOnMessageCallback()
{ {
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType, [](ix::WebSocketMessageType,
@ -139,9 +126,9 @@ namespace ix
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
invokeOnEventCallback(ix::SatoriConnection_EventType_Open, invokeEventCallback(ix::SatoriConnection_EventType_Open,
std::string(), std::string(),
headers); headers);
sendHandshakeMessage(); sendHandshakeMessage();
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
@ -151,13 +138,14 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << "Close code " << closeInfo.code; ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason; ss << " reason " << closeInfo.reason;
invokeOnEventCallback(ix::SatoriConnection_EventType_Closed, invokeEventCallback(ix::SatoriConnection_EventType_Closed,
ss.str()); ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
Json::Value data; Json::Value data;
if (!parseJson(str, data)) Json::Reader reader;
if (!reader.parse(str, data))
{ {
invokeErrorCallback(std::string("Invalid json: ") + str); invokeErrorCallback(std::string("Invalid json: ") + str);
return; return;
@ -185,7 +173,7 @@ namespace ix
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
_authenticated = true; _authenticated = true;
invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated); invokeEventCallback(ix::SatoriConnection_EventType_Authenticated);
flushQueue(); flushQueue();
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
@ -240,7 +228,7 @@ namespace ix
pdu["action"] = "auth/handshake"; pdu["action"] = "auth/handshake";
pdu["body"] = body; pdu["body"] = body;
std::string serializedJson = writeJsonCompact(pdu); std::string serializedJson = _jsonWriter.write(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success; return _webSocket.send(serializedJson).success;
@ -302,7 +290,7 @@ namespace ix
pdu["action"] = "auth/authenticate"; pdu["action"] = "auth/authenticate";
pdu["body"] = body; pdu["body"] = body;
std::string serializedJson = writeJsonCompact(pdu); std::string serializedJson = _jsonWriter.write(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success; return _webSocket.send(serializedJson).success;
@ -355,7 +343,7 @@ namespace ix
_body["message"] = msg; _body["message"] = msg;
_pdu["body"] = _body; _pdu["body"] = _body;
std::string serializedJson = writeJsonCompact(_pdu); std::string serializedJson = _jsonWriter.write(_pdu);
// //
// Fast path. We are authenticated and the publishing succeed // Fast path. We are authenticated and the publishing succeed

View File

@ -27,10 +27,10 @@ namespace ix
}; };
using SubscriptionCallback = std::function<void(const Json::Value&)>; using SubscriptionCallback = std::function<void(const Json::Value&)>;
using OnEventCallback = std::function<void(SatoriConnectionEventType, using EventCallback = std::function<void(SatoriConnectionEventType,
const std::string&, const std::string&,
const WebSocketHttpHeaders&)>; const WebSocketHttpHeaders&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class SatoriConnection class SatoriConnection
{ {
@ -47,13 +47,13 @@ namespace ix
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions); WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions);
/// Set the traffic tracker callback /// Set the traffic tracker callback
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
/// Reset the traffic tracker callback to an no-op one. /// Reset the traffic tracker callback to an no-op one.
static void resetTrafficTrackerCallback(); static void resetTrafficTrackerCallback();
/// Set the closed callback /// Set the closed callback
void setOnEventCallback(const OnEventCallback& onEventCallback); void setEventCallback(const EventCallback& eventCallback);
/// Start the worker thread, used for background publishing /// Start the worker thread, used for background publishing
void start(); void start();
@ -86,7 +86,7 @@ namespace ix
bool sendAuthMessage(const std::string& nonce); bool sendAuthMessage(const std::string& nonce);
bool handleSubscriptionData(const Json::Value& pdu); bool handleSubscriptionData(const Json::Value& pdu);
void resetOnMessageCallback(); void resetWebSocketOnMessageCallback();
bool publishMessage(const std::string& serializedJson); bool publishMessage(const std::string& serializedJson);
bool flushQueue(); bool flushQueue();
@ -96,9 +96,9 @@ namespace ix
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke event callbacks /// Invoke event callbacks
void invokeOnEventCallback(SatoriConnectionEventType eventType, void invokeEventCallback(SatoriConnectionEventType eventType,
const std::string& errorMsg = std::string(), const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders()); const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
void invokeErrorCallback(const std::string& errorMsg); void invokeErrorCallback(const std::string& errorMsg);
/// ///
@ -118,12 +118,13 @@ namespace ix
// Keep some objects around // Keep some objects around
Json::Value _body; Json::Value _body;
Json::Value _pdu; Json::Value _pdu;
Json::FastWriter _jsonWriter;
/// Traffic tracker callback /// Traffic tracker callback
static OnTrafficTrackerCallback _onTrafficTrackerCallback; static TrafficTrackerCallback _trafficTrackerCallback;
/// Callbacks /// Satori events callbacks
OnEventCallback _onEventCallback; EventCallback _eventCallback;
/// Subscription callbacks, only one per channel /// Subscription callbacks, only one per channel
std::unordered_map<std::string, SubscriptionCallback> _cbs; std::unordered_map<std::string, SubscriptionCallback> _cbs;

View File

@ -188,6 +188,7 @@ namespace ix {
_ws.dispatch( _ws.dispatch(
[this](const std::string& msg, [this](const std::string& msg,
size_t wireSize, size_t wireSize,
bool decompressionError,
WebSocketTransport::MessageKind messageKind) WebSocketTransport::MessageKind messageKind)
{ {
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
@ -209,8 +210,11 @@ namespace ix {
} break; } break;
} }
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize, _onMessageCallback(webSocketMessageType, msg, wireSize,
WebSocketErrorInfo(), WebSocketCloseInfo(), webSocketErrorInfo, WebSocketCloseInfo(),
WebSocketHttpHeaders()); WebSocketHttpHeaders());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);

View File

@ -15,6 +15,7 @@
#include <atomic> #include <atomic>
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
@ -40,30 +41,17 @@ namespace ix
WebSocket_MessageType_Pong = 5 WebSocket_MessageType_Pong = 5
}; };
struct WebSocketErrorInfo
{
uint64_t retries;
double wait_time;
int http_status;
std::string reason;
};
struct WebSocketCloseInfo struct WebSocketCloseInfo
{ {
uint16_t code; uint16_t code;
std::string reason; std::string reason;
WebSocketCloseInfo(uint64_t c, const std::string& r) WebSocketCloseInfo(uint64_t c = 0,
const std::string& r = std::string())
{ {
code = c; code = c;
reason = r; reason = r;
} }
WebSocketCloseInfo()
{
code = 0;
reason = "";
}
}; };
using OnMessageCallback = std::function<void(WebSocketMessageType, using OnMessageCallback = std::function<void(WebSocketMessageType,

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketErrorInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct WebSocketErrorInfo
{
uint64_t retries;
double wait_time;
int http_status;
std::string reason;
bool decompressionError;
};
}

View File

@ -36,6 +36,7 @@
#include "zlib.h" #include "zlib.h"
#include <string> #include <string>
#include <memory>
namespace ix namespace ix
{ {

View File

@ -7,18 +7,22 @@
#pragma once #pragma once
#include <string> #include <string>
#include <iostream>
namespace ix namespace ix
{ {
struct WebSocketSendInfo struct WebSocketSendInfo
{ {
bool success; bool success;
bool compressionError;
size_t payloadSize; size_t payloadSize;
size_t wireSize; size_t wireSize;
WebSocketSendInfo(bool s = false, size_t p = -1, size_t w = -1) WebSocketSendInfo(bool s = false, bool c = false,
size_t p = 0, size_t w = 0)
{ {
success = s; success = s;
compressionError = c;
payloadSize = p; payloadSize = p;
wireSize = w; wireSize = w;
} }

View File

@ -233,7 +233,7 @@ namespace ix
} }
char line[512]; char line[256];
int i; int i;
for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i) for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i)
{ {
@ -634,18 +634,12 @@ namespace ix
if (_enablePerMessageDeflate && ws.rsv1) if (_enablePerMessageDeflate && ws.rsv1)
{ {
std::string decompressedMessage; std::string decompressedMessage;
if (_perMessageDeflate.decompress(message, decompressedMessage)) bool success = _perMessageDeflate.decompress(message, decompressedMessage);
{ onMessageCallback(decompressedMessage, wireSize, not success, messageKind);
onMessageCallback(decompressedMessage, wireSize, messageKind);
}
else
{
std::cerr << "error decompressing msg !"<< std::endl;
}
} }
else else
{ {
onMessageCallback(message, wireSize, messageKind); onMessageCallback(message, wireSize, false, messageKind);
} }
} }
@ -670,13 +664,15 @@ namespace ix
size_t payloadSize = message.size(); size_t payloadSize = message.size();
size_t wireSize = message.size(); size_t wireSize = message.size();
std::string compressedMessage; std::string compressedMessage;
bool compressionError = false;
std::string::const_iterator message_begin = message.begin(); std::string::const_iterator message_begin = message.begin();
std::string::const_iterator message_end = message.end(); std::string::const_iterator message_end = message.end();
if (compress) if (compress)
{ {
_perMessageDeflate.compress(message, compressedMessage); bool success = _perMessageDeflate.compress(message, compressedMessage);
compressionError = !success;
wireSize = compressedMessage.size(); wireSize = compressedMessage.size();
message_begin = compressedMessage.begin(); message_begin = compressedMessage.begin();
@ -749,7 +745,7 @@ namespace ix
// Now actually send this data // Now actually send this data
sendOnSocket(); sendOnSocket();
return WebSocketSendInfo(true, payloadSize, wireSize); return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
} }
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message) WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
@ -776,7 +772,7 @@ namespace ix
{ {
break; break;
} }
else if (ret <= 0) else if (ret <= 0)
{ {
_socket->close(); _socket->close();
@ -801,7 +797,7 @@ namespace ix
// >>> struct.pack('!H', 1000) // >>> struct.pack('!H', 1000)
// b'\x03\xe8' // b'\x03\xe8'
// //
const std::string normalClosure = std::string("\x03\xe9"); const std::string normalClosure = std::string("\x03\xe8");
bool compress = false; bool compress = false;
sendData(wsheader_type::CLOSE, normalClosure, compress); sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING); setReadyState(CLOSING);

View File

@ -33,9 +33,9 @@ namespace ix
std::string errorStr; std::string errorStr;
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
WebSocketInitResult(bool s, WebSocketInitResult(bool s = false,
int status, int status = 0,
const std::string& e, const std::string& e = std::string(),
WebSocketHttpHeaders h = WebSocketHttpHeaders()) WebSocketHttpHeaders h = WebSocketHttpHeaders())
{ {
success = s; success = s;
@ -43,15 +43,6 @@ namespace ix
errorStr = e; errorStr = e;
headers = h; headers = h;
} }
// need to define a default
WebSocketInitResult()
{
success = false;
http_status = 0;
errorStr = "";
headers.clear();
}
}; };
class WebSocketTransport class WebSocketTransport
@ -74,6 +65,7 @@ namespace ix
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
size_t, size_t,
bool,
MessageKind)>; MessageKind)>;
using OnCloseCallback = std::function<void(uint16_t, using OnCloseCallback = std::function<void(uint16_t,
const std::string&, const std::string&,