diff --git a/examples/satori_publisher/IXSatoriConnection.cpp b/examples/satori_publisher/IXSatoriConnection.cpp index 8fea38da..363230f4 100644 --- a/examples/satori_publisher/IXSatoriConnection.cpp +++ b/examples/satori_publisher/IXSatoriConnection.cpp @@ -88,9 +88,18 @@ namespace ix } } - void SatoriConnection::logError(const std::string& error) + void SatoriConnection::setErrorCallback(const ErrorCallback& errorCallback) { - std::cerr << "SatoriConnection: " << error; + _errorCallback = errorCallback; + } + + void SatoriConnection::invokeErrorCallback(const std::string& errorMsg) + { + + if (_errorCallback) + { + _errorCallback(errorMsg); + } } void SatoriConnection::disconnect() @@ -145,13 +154,13 @@ namespace ix Json::Value data; if (!parseJson(str, data)) { - logError(std::string("Invalid json: ") + str); + invokeErrorCallback(std::string("Invalid json: ") + str); return; } if (!data.isMember("action")) { - logError("Missing action"); + invokeErrorCallback("Missing action"); return; } @@ -161,12 +170,12 @@ namespace ix { if (!handleHandshakeResponse(data)) { - logError("Error extracting nonce from handshake response"); + invokeErrorCallback("Error extracting nonce from handshake response"); } } else if (action == "auth/handshake/error") { - logError("Handshake error."); // print full message ? + invokeErrorCallback("Handshake error."); // print full message ? } else if (action == "auth/authenticate/ok") { @@ -176,7 +185,7 @@ namespace ix } else if (action == "auth/authenticate/error") { - logError("Authentication error."); // print full message ? + invokeErrorCallback("Authentication error."); // print full message ? } else if (action == "rtm/subscription/data") { @@ -184,7 +193,7 @@ namespace ix } else { - logError(std::string("Un-handled message type: ") + action); + invokeErrorCallback(std::string("Un-handled message type: ") + action); } } else if (messageType == ix::WebSocket_MessageType_Error) @@ -194,7 +203,7 @@ namespace ix ss << "#retries: " << error.retries << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "HTTP Status: " << error.http_status << std::endl; - logError(ss.str()); + invokeErrorCallback(ss.str()); } }); } diff --git a/examples/satori_publisher/IXSatoriConnection.h b/examples/satori_publisher/IXSatoriConnection.h index 119eee2a..294985c4 100644 --- a/examples/satori_publisher/IXSatoriConnection.h +++ b/examples/satori_publisher/IXSatoriConnection.h @@ -19,6 +19,7 @@ namespace ix { using SubscriptionCallback = std::function; using AuthenticatedCallback = std::function; + using ErrorCallback = std::function; using OnTrafficTrackerCallback = std::function; class SatoriConnection @@ -40,8 +41,10 @@ namespace ix /// Reset the traffic tracker callback to an no-op one. static void resetTrafficTrackerCallback(); - /// Reset the traffic tracker callback to an no-op one. + /// Set the authenticated callback void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback); + /// Set the error callback + void setErrorCallback(const ErrorCallback& errorCallback); /// Start the worker thread, used for background publishing void start(); @@ -67,8 +70,6 @@ namespace ix /// Returns true only if we're connected bool isConnected() const; - - void logError(const std::string& error); private: bool sendHandshakeMessage(); @@ -83,8 +84,9 @@ namespace ix /// Invoke the traffic tracker callback static void invokeTrafficTrackerCallback(size_t size, bool incoming); - /// Invoke the authenticated callback + /// Invoke lifecycle callbacks void invokeAuthenticatedCallback(); + void invokeErrorCallback(const std::string& errorMsg); /// /// Member variables @@ -108,8 +110,9 @@ namespace ix /// Traffic tracker callback static OnTrafficTrackerCallback _onTrafficTrackerCallback; - /// Callback invoked when we are authenticated + /// Callbacks AuthenticatedCallback _authenticatedCallback; + ErrorCallback _errorCallback; /// Subscription callbacks, only one per channel std::unordered_map _cbs; diff --git a/examples/satori_publisher/events.jsonl b/examples/satori_publisher/events.jsonl new file mode 100644 index 00000000..a70f1fd2 --- /dev/null +++ b/examples/satori_publisher/events.jsonl @@ -0,0 +1,3 @@ +{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Foo"} +{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Bar"} +{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Baz"} diff --git a/examples/satori_publisher/satori_publisher.cpp b/examples/satori_publisher/satori_publisher.cpp index 3a4abc98..cdc71d50 100644 --- a/examples/satori_publisher/satori_publisher.cpp +++ b/examples/satori_publisher/satori_publisher.cpp @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) std::ifstream f(path); if (!f.is_open()) { - std::cerr << "error while opening file" << std::endl; + std::cerr << "Error while opening file: " << path << std::endl; } while (getline(f, line)) @@ -70,16 +70,23 @@ int main(int argc, char* argv[]) if (f.bad()) { - std::cerr << "error while reading file" << std::endl; + std::cerr << "Error while opening file: " << path << std::endl; } done = true; } ); + satoriConnection.setErrorCallback( + [&done](const std::string& errMsg) + { + std::cerr << "Satori Error received: " << errMsg << std::endl; + done = true; + } + ); while (!done) { - msleep(1000); + msleep(10); } std::cout << incomingBytes << std::endl; diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 4e7cb795..92b4ce41 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -21,7 +21,6 @@ #include "libwshandshake.hpp" -// #include #include #include @@ -40,9 +39,10 @@ namespace ix { WebSocketTransport::WebSocketTransport() : - _readyState(CLOSED) + _readyState(CLOSED), + _enablePerMessageDeflate(false) { - ; + _perMessageDeflate.init(); } WebSocketTransport::~WebSocketTransport() @@ -206,18 +206,27 @@ namespace ix { std::string secWebSocketKey = genRandomString(22); secWebSocketKey += "=="; - char line[256]; + std::string extensions; + if (_enablePerMessageDeflate) + { + // extensions = "Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover; server_no_context_takeover\r\n"; + extensions = "Sec-WebSocket-Extensions: permessage-deflate\r\n"; + } + + char line[512]; int status; int i; - snprintf(line, 256, + snprintf(line, 512, "GET %s HTTP/1.1\r\n" "Host: %s:%d\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Key: %s\r\n" "Sec-WebSocket-Version: 13\r\n" + "%s" "\r\n", - path.c_str(), host.c_str(), port, secWebSocketKey.c_str()); + path.c_str(), host.c_str(), port, + secWebSocketKey.c_str(), extensions.c_str()); size_t lineSize = strlen(line); if (_socket->send(line, lineSize) != lineSize) @@ -295,11 +304,13 @@ namespace ix { // subtract 1 for '\0', 1 for '\n', 1 for '\r', // 1 for the ' ' after the ':', and total is -4 std::string name(lineStr.substr(0, colon)); + std::string value(lineStr.substr(colon + 2, i - colon - 4)); // Make the name lower case. std::transform(name.begin(), name.end(), name.begin(), ::tolower); - headers[name] = lineStr.substr(colon + 2, i - colon - 4); + headers[name] = value; + std::cout << name << " -> " << value << std::endl; } } @@ -534,7 +545,33 @@ namespace ix { // fire callback with a string message std::string stringMessage(_receivedData.begin(), _receivedData.end()); - onMessageCallback(stringMessage, MSG); + + std::cout << "raw msg: " << stringMessage << std::endl; + std::cout << "raw msg size: " << stringMessage.size() << std::endl; + + // ws.rsv1 means the message is compressed + // FIXME hack hack + std::string decompressedMessage; + + if (_enablePerMessageDeflate && ws.rsv1) + { + if (_perMessageDeflate.decompress(stringMessage, + decompressedMessage)) + { + std::cout << "decompressed msg: " << decompressedMessage << std::endl; + std::cout << "msg size: " << decompressedMessage.size() << std::endl; + onMessageCallback(decompressedMessage, MSG); + } + else + { + std::cout << "error decompressing msg !"<< std::endl; + + } + } + else + { + onMessageCallback(stringMessage, MSG); + } _receivedData.clear(); } @@ -621,6 +658,13 @@ namespace ix { (message_size >= 126 ? 2 : 0) + (message_size >= 65536 ? 6 : 0) + 4, 0); header[0] = 0x80 | type; + + // This bit indicate that the frame is compressed + if (_enablePerMessageDeflate) + { + header[0] |= 0x40; + } + if (message_size < 126) { header[1] = (message_size & 0xff) | 0x80; @@ -674,7 +718,27 @@ namespace ix { void WebSocketTransport::sendBinary(const std::string& message) { - sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end()); + if (_enablePerMessageDeflate) + { + // FIXME hack hack + std::string compressedMessage; + _perMessageDeflate.compress(message, compressedMessage); + std::cout << "uncompressedMessage " << message << std::endl; + std::cout << "uncompressedMessage.size() " << message.size() << std::endl; + std::cout << "compressedMessage.size() " << compressedMessage.size() + << std::endl; + + // sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end()); + sendData(wsheader_type::BINARY_FRAME, + compressedMessage.size(), + compressedMessage.begin(), + compressedMessage.end()); + } + else + { + sendData(wsheader_type::BINARY_FRAME, message.size(), + message.begin(), message.end()); + } } void WebSocketTransport::sendOnSocket() diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index e2dbbfec..dddd2db0 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -17,6 +17,8 @@ #include #include +#include "IXWebSocketPerMessageDeflate.h" + namespace ix { class Socket; @@ -121,12 +123,15 @@ namespace ix std::shared_ptr _socket; std::atomic _readyState; + std::atomic _enablePerMessageDeflate; OnCloseCallback _onCloseCallback; uint16_t _closeCode; std::string _closeReason; mutable std::mutex _closeDataMutex; + WebSocketPerMessageDeflate _perMessageDeflate; + void sendOnSocket(); void sendData(wsheader_type::opcode_type type, uint64_t message_size,