diff --git a/examples/satori_publisher/IXSatoriConnection.cpp b/examples/satori_publisher/IXSatoriConnection.cpp index cd649144..cc35ee88 100644 --- a/examples/satori_publisher/IXSatoriConnection.cpp +++ b/examples/satori_publisher/IXSatoriConnection.cpp @@ -13,32 +13,19 @@ #include #include -namespace -{ - bool parseJson(const std::string& str, Json::Value& value) - { - Json::Reader reader; - return reader.parse(str, value); - } - - std::string writeJsonCompact(const Json::Value& value) - { - Json::FastWriter writer; - return writer.write(value); - } -} namespace ix { - OnTrafficTrackerCallback SatoriConnection::_onTrafficTrackerCallback = nullptr; + TrafficTrackerCallback SatoriConnection::_trafficTrackerCallback = nullptr; constexpr size_t SatoriConnection::kQueueMaxSize; SatoriConnection::SatoriConnection() : _authenticated(false), - _onEventCallback(nullptr) + _eventCallback(nullptr) { _pdu["action"] = "rtm/publish"; - resetOnMessageCallback(); + + resetWebSocketOnMessageCallback(); } SatoriConnection::~SatoriConnection() @@ -46,9 +33,9 @@ namespace ix disconnect(); } - void SatoriConnection::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback) + void SatoriConnection::setTrafficTrackerCallback(const TrafficTrackerCallback& callback) { - _onTrafficTrackerCallback = callback; + _trafficTrackerCallback = callback; } void SatoriConnection::resetTrafficTrackerCallback() @@ -58,40 +45,40 @@ namespace ix void SatoriConnection::invokeTrafficTrackerCallback(size_t size, bool incoming) { - if (_onTrafficTrackerCallback) + if (_trafficTrackerCallback) { - _onTrafficTrackerCallback(size, incoming); + _trafficTrackerCallback(size, incoming); } } - void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback) + void SatoriConnection::setEventCallback(const EventCallback& eventCallback) { - _onEventCallback = onEventCallback; + _eventCallback = eventCallback; } - void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType, - const std::string& errorMsg, - const WebSocketHttpHeaders& headers) + void SatoriConnection::invokeEventCallback(ix::SatoriConnectionEventType eventType, + const std::string& errorMsg, + const WebSocketHttpHeaders& headers) { - if (_onEventCallback) + if (_eventCallback) { - _onEventCallback(eventType, errorMsg, headers); + _eventCallback(eventType, errorMsg, headers); } } void SatoriConnection::invokeErrorCallback(const std::string& errorMsg) { - invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg); + invokeEventCallback(ix::SatoriConnection_EventType_Error, errorMsg); } void SatoriConnection::disconnect() { _webSocket.stop(); - resetOnMessageCallback(); + resetWebSocketOnMessageCallback(); } - void SatoriConnection::resetOnMessageCallback() + void SatoriConnection::resetWebSocketOnMessageCallback() { _webSocket.setOnMessageCallback( [](ix::WebSocketMessageType, @@ -139,9 +126,9 @@ namespace ix std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) { - invokeOnEventCallback(ix::SatoriConnection_EventType_Open, - std::string(), - headers); + invokeEventCallback(ix::SatoriConnection_EventType_Open, + std::string(), + headers); sendHandshakeMessage(); } else if (messageType == ix::WebSocket_MessageType_Close) @@ -151,13 +138,14 @@ namespace ix std::stringstream ss; ss << "Close code " << closeInfo.code; ss << " reason " << closeInfo.reason; - invokeOnEventCallback(ix::SatoriConnection_EventType_Closed, - ss.str()); + invokeEventCallback(ix::SatoriConnection_EventType_Closed, + ss.str()); } else if (messageType == ix::WebSocket_MessageType_Message) { Json::Value data; - if (!parseJson(str, data)) + Json::Reader reader; + if (!reader.parse(str, data)) { invokeErrorCallback(std::string("Invalid json: ") + str); return; @@ -185,7 +173,7 @@ namespace ix else if (action == "auth/authenticate/ok") { _authenticated = true; - invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated); + invokeEventCallback(ix::SatoriConnection_EventType_Authenticated); flushQueue(); } else if (action == "auth/authenticate/error") @@ -240,7 +228,7 @@ namespace ix pdu["action"] = "auth/handshake"; pdu["body"] = body; - std::string serializedJson = writeJsonCompact(pdu); + std::string serializedJson = _jsonWriter.write(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); return _webSocket.send(serializedJson).success; @@ -302,7 +290,7 @@ namespace ix pdu["action"] = "auth/authenticate"; pdu["body"] = body; - std::string serializedJson = writeJsonCompact(pdu); + std::string serializedJson = _jsonWriter.write(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); return _webSocket.send(serializedJson).success; @@ -355,7 +343,7 @@ namespace ix _body["message"] = msg; _pdu["body"] = _body; - std::string serializedJson = writeJsonCompact(_pdu); + std::string serializedJson = _jsonWriter.write(_pdu); // // Fast path. We are authenticated and the publishing succeed diff --git a/examples/satori_publisher/IXSatoriConnection.h b/examples/satori_publisher/IXSatoriConnection.h index 8b80b550..db47b472 100644 --- a/examples/satori_publisher/IXSatoriConnection.h +++ b/examples/satori_publisher/IXSatoriConnection.h @@ -27,10 +27,10 @@ namespace ix }; using SubscriptionCallback = std::function; - using OnEventCallback = std::function; - using OnTrafficTrackerCallback = std::function; + using EventCallback = std::function; + using TrafficTrackerCallback = std::function; class SatoriConnection { @@ -47,13 +47,13 @@ namespace ix WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions); /// Set the traffic tracker callback - static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); + static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); /// Reset the traffic tracker callback to an no-op one. static void resetTrafficTrackerCallback(); /// Set the closed callback - void setOnEventCallback(const OnEventCallback& onEventCallback); + void setEventCallback(const EventCallback& eventCallback); /// Start the worker thread, used for background publishing void start(); @@ -86,7 +86,7 @@ namespace ix bool sendAuthMessage(const std::string& nonce); bool handleSubscriptionData(const Json::Value& pdu); - void resetOnMessageCallback(); + void resetWebSocketOnMessageCallback(); bool publishMessage(const std::string& serializedJson); bool flushQueue(); @@ -96,9 +96,9 @@ namespace ix static void invokeTrafficTrackerCallback(size_t size, bool incoming); /// Invoke event callbacks - void invokeOnEventCallback(SatoriConnectionEventType eventType, - const std::string& errorMsg = std::string(), - const WebSocketHttpHeaders& headers = WebSocketHttpHeaders()); + void invokeEventCallback(SatoriConnectionEventType eventType, + const std::string& errorMsg = std::string(), + const WebSocketHttpHeaders& headers = WebSocketHttpHeaders()); void invokeErrorCallback(const std::string& errorMsg); /// @@ -118,12 +118,13 @@ namespace ix // Keep some objects around Json::Value _body; Json::Value _pdu; + Json::FastWriter _jsonWriter; /// Traffic tracker callback - static OnTrafficTrackerCallback _onTrafficTrackerCallback; + static TrafficTrackerCallback _trafficTrackerCallback; - /// Callbacks - OnEventCallback _onEventCallback; + /// Satori events callbacks + EventCallback _eventCallback; /// Subscription callbacks, only one per channel std::unordered_map _cbs; diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index d0221d1f..c03384c1 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -188,6 +188,7 @@ namespace ix { _ws.dispatch( [this](const std::string& msg, size_t wireSize, + bool decompressionError, WebSocketTransport::MessageKind messageKind) { WebSocketMessageType webSocketMessageType; @@ -209,8 +210,11 @@ namespace ix { } break; } + WebSocketErrorInfo webSocketErrorInfo; + webSocketErrorInfo.decompressionError = decompressionError; + _onMessageCallback(webSocketMessageType, msg, wireSize, - WebSocketErrorInfo(), WebSocketCloseInfo(), + webSocketErrorInfo, WebSocketCloseInfo(), WebSocketHttpHeaders()); WebSocket::invokeTrafficTrackerCallback(msg.size(), true); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 0bd6275f..4b8406a0 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -15,6 +15,7 @@ #include #include "IXWebSocketTransport.h" +#include "IXWebSocketErrorInfo.h" #include "IXWebSocketSendInfo.h" #include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketHttpHeaders.h" @@ -40,30 +41,17 @@ namespace ix WebSocket_MessageType_Pong = 5 }; - struct WebSocketErrorInfo - { - uint64_t retries; - double wait_time; - int http_status; - std::string reason; - }; - struct WebSocketCloseInfo { uint16_t code; std::string reason; - WebSocketCloseInfo(uint64_t c, const std::string& r) + WebSocketCloseInfo(uint64_t c = 0, + const std::string& r = std::string()) { code = c; reason = r; } - - WebSocketCloseInfo() - { - code = 0; - reason = ""; - } }; using OnMessageCallback = std::function + +namespace ix +{ + struct WebSocketErrorInfo + { + uint64_t retries; + double wait_time; + int http_status; + std::string reason; + bool decompressionError; + }; +} diff --git a/ixwebsocket/IXWebSocketPerMessageDeflate.h b/ixwebsocket/IXWebSocketPerMessageDeflate.h index b8e91ecb..6cfb0ad0 100644 --- a/ixwebsocket/IXWebSocketPerMessageDeflate.h +++ b/ixwebsocket/IXWebSocketPerMessageDeflate.h @@ -36,6 +36,7 @@ #include "zlib.h" #include +#include namespace ix { diff --git a/ixwebsocket/IXWebSocketSendInfo.h b/ixwebsocket/IXWebSocketSendInfo.h index b7d05a5b..dd9f4f3f 100644 --- a/ixwebsocket/IXWebSocketSendInfo.h +++ b/ixwebsocket/IXWebSocketSendInfo.h @@ -7,18 +7,22 @@ #pragma once #include +#include namespace ix { struct WebSocketSendInfo { bool success; + bool compressionError; size_t payloadSize; size_t wireSize; - WebSocketSendInfo(bool s = false, size_t p = -1, size_t w = -1) + WebSocketSendInfo(bool s = false, bool c = false, + size_t p = 0, size_t w = 0) { success = s; + compressionError = c; payloadSize = p; wireSize = w; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index b79c1354..7cbacbd6 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -233,7 +233,7 @@ namespace ix } - char line[512]; + char line[256]; int i; for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i) { @@ -634,18 +634,12 @@ namespace ix if (_enablePerMessageDeflate && ws.rsv1) { std::string decompressedMessage; - if (_perMessageDeflate.decompress(message, decompressedMessage)) - { - onMessageCallback(decompressedMessage, wireSize, messageKind); - } - else - { - std::cerr << "error decompressing msg !"<< std::endl; - } + bool success = _perMessageDeflate.decompress(message, decompressedMessage); + onMessageCallback(decompressedMessage, wireSize, not success, messageKind); } else { - onMessageCallback(message, wireSize, messageKind); + onMessageCallback(message, wireSize, false, messageKind); } } @@ -670,13 +664,15 @@ namespace ix size_t payloadSize = message.size(); size_t wireSize = message.size(); std::string compressedMessage; + bool compressionError = false; std::string::const_iterator message_begin = message.begin(); std::string::const_iterator message_end = message.end(); if (compress) { - _perMessageDeflate.compress(message, compressedMessage); + bool success = _perMessageDeflate.compress(message, compressedMessage); + compressionError = !success; wireSize = compressedMessage.size(); message_begin = compressedMessage.begin(); @@ -749,7 +745,7 @@ namespace ix // Now actually send this data sendOnSocket(); - return WebSocketSendInfo(true, payloadSize, wireSize); + return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); } WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message) @@ -776,7 +772,7 @@ namespace ix { break; } - else if (ret <= 0) + else if (ret <= 0) { _socket->close(); @@ -801,7 +797,7 @@ namespace ix // >>> struct.pack('!H', 1000) // b'\x03\xe8' // - const std::string normalClosure = std::string("\x03\xe9"); + const std::string normalClosure = std::string("\x03\xe8"); bool compress = false; sendData(wsheader_type::CLOSE, normalClosure, compress); setReadyState(CLOSING); diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index 7c52184d..6c025d47 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -33,9 +33,9 @@ namespace ix std::string errorStr; WebSocketHttpHeaders headers; - WebSocketInitResult(bool s, - int status, - const std::string& e, + WebSocketInitResult(bool s = false, + int status = 0, + const std::string& e = std::string(), WebSocketHttpHeaders h = WebSocketHttpHeaders()) { success = s; @@ -43,15 +43,6 @@ namespace ix errorStr = e; headers = h; } - - // need to define a default - WebSocketInitResult() - { - success = false; - http_status = 0; - errorStr = ""; - headers.clear(); - } }; class WebSocketTransport @@ -74,6 +65,7 @@ namespace ix using OnMessageCallback = std::function; using OnCloseCallback = std::function