diff --git a/CMakeLists.txt b/CMakeLists.txt index 5116a402..a8eac880 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp + ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ) set( IXWEBSOCKET_HEADERS @@ -25,6 +26,8 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXWebSocket.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketPerMessageDeflate.h + ixwebsocket/IXWebSocketPerMessageDeflateOptions.h + ixwebsocket/IXWebSocketHttpHeaders.h ) if (USE_TLS) diff --git a/Dockerfile b/Dockerfile new file mode 120000 index 00000000..e4ce2a81 --- /dev/null +++ b/Dockerfile @@ -0,0 +1 @@ +docker/Dockerfile.debian \ No newline at end of file diff --git a/README.md b/README.md index f34237a7..9e2fd316 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,10 @@ webSocket.configure(url); webSocket.setOnMessageCallback( [](ix::WebSocketMessageType messageType, const std::string& str, + size_t wireSize, const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { if (messageType == ix::WebSocket_MessageType_Message) { @@ -54,6 +56,10 @@ CMakefiles for the library and the examples are available. This library has few ## Implementation details +### Per Message Deflate compression. + +The per message deflate compression option is supported. It can lead to very nice bandbwith savings (20x !) if your messages are similar, which is often the case for example for chat applications. All features of the spec should be supported. + ### TLS/SSL Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, and OpenSSL is used on Android and Linux. @@ -127,11 +133,23 @@ The onMessage event will be fired when the connection is opened or closed. This ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo closeInfo&) + [](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { if (messageType == ix::WebSocket_MessageType_Open) { std::cout << "send greetings" << std::endl; + + // Headers can be inspected (pairs of string/string) + std::cout << "Handshake Headers:" << std::endl; + for (auto it : headers) + { + std::cout << it.first << ": " << it.second << std::endl; + } } else if (messageType == ix::WebSocket_MessageType_Close) { @@ -152,7 +170,12 @@ A message will be fired when there is an error with the connection. The message ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo) + [](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { if (messageType == ix::WebSocket_MessageType_Error) { @@ -187,7 +210,12 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo) + [](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { if (messageType == ix::WebSocket_MessageType_Ping || messageType == ix::WebSocket_MessageType_Pong) diff --git a/docker/Dockerfile.debian b/docker/Dockerfile.debian index 66e60a86..a2743e70 100644 --- a/docker/Dockerfile.debian +++ b/docker/Dockerfile.debian @@ -1,6 +1,5 @@ FROM debian:stretch -# RUN yum install -y gcc-c++ make cmake openssl-devel gdb ENV DEBIAN_FRONTEND noninteractive RUN apt-get update RUN apt-get -y install g++ diff --git a/examples/chat/.gitignore b/examples/chat/.gitignore new file mode 100644 index 00000000..a18937a6 --- /dev/null +++ b/examples/chat/.gitignore @@ -0,0 +1,3 @@ +build +venv +node_modules diff --git a/examples/chat/cmd_websocket_chat.cpp b/examples/chat/cmd_websocket_chat.cpp index dc1cc4c8..9100d95b 100644 --- a/examples/chat/cmd_websocket_chat.cpp +++ b/examples/chat/cmd_websocket_chat.cpp @@ -77,7 +77,7 @@ namespace void WebSocketChat::start() { std::string url("ws://localhost:8080/"); - _webSocket.configure(url); + _webSocket.setUrl(url); std::stringstream ss; log(std::string("Connecting to url: ") + url); @@ -85,8 +85,10 @@ namespace _webSocket.setOnMessageCallback( [this](ix::WebSocketMessageType messageType, const std::string& str, + size_t wireSize, const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) diff --git a/examples/chat/package-lock.json b/examples/chat/package-lock.json new file mode 100644 index 00000000..c3b8a0a8 --- /dev/null +++ b/examples/chat/package-lock.json @@ -0,0 +1,31 @@ +{ + "requires": true, + "lockfileVersion": 1, + "dependencies": { + "async-limiter": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz", + "integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg==" + }, + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, + "ultron": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz", + "integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og==" + }, + "ws": { + "version": "3.3.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz", + "integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==", + "requires": { + "async-limiter": "1.0.0", + "safe-buffer": "5.1.2", + "ultron": "1.1.1" + } + } + } +} diff --git a/examples/ping_pong/.gitignore b/examples/ping_pong/.gitignore index 5ceb3864..92260b89 100644 --- a/examples/ping_pong/.gitignore +++ b/examples/ping_pong/.gitignore @@ -1 +1,2 @@ venv +build diff --git a/examples/ping_pong/ping_pong.cpp b/examples/ping_pong/ping_pong.cpp index f5a34e10..aabdaeaf 100644 --- a/examples/ping_pong/ping_pong.cpp +++ b/examples/ping_pong/ping_pong.cpp @@ -48,16 +48,18 @@ namespace void WebSocketPingPong::start() { - _webSocket.configure(_url); + _webSocket.setUrl(_url); std::stringstream ss; log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( [this](ix::WebSocketMessageType messageType, - const std::string& str, - const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) @@ -110,7 +112,7 @@ namespace void WebSocketPingPong::ping(const std::string& text) { - if (!_webSocket.ping(text)) + if (!_webSocket.ping(text).success) { std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint is disconnected" << std::endl; diff --git a/examples/satori_publisher/.gitignore b/examples/satori_publisher/.gitignore new file mode 100644 index 00000000..562ccdce --- /dev/null +++ b/examples/satori_publisher/.gitignore @@ -0,0 +1,3 @@ +venv +build +node_modules diff --git a/examples/satori_publisher/CMakeLists.txt b/examples/satori_publisher/CMakeLists.txt index 39938753..9b19d13f 100644 --- a/examples/satori_publisher/CMakeLists.txt +++ b/examples/satori_publisher/CMakeLists.txt @@ -6,6 +6,9 @@ cmake_minimum_required (VERSION 3.4.1) project (satori_publisher) +# There's -Weverything too for clang +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32") + set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl set (CMAKE_CXX_STANDARD 11) diff --git a/examples/satori_publisher/IXSatoriConnection.cpp b/examples/satori_publisher/IXSatoriConnection.cpp index 363230f4..404b1a69 100644 --- a/examples/satori_publisher/IXSatoriConnection.cpp +++ b/examples/satori_publisher/IXSatoriConnection.cpp @@ -12,7 +12,6 @@ #include #include #include -#include namespace ix { @@ -36,19 +35,10 @@ namespace ix SatoriConnection::SatoriConnection() : _authenticated(false), - _authenticatedCallback(nullptr) + _onEventCallback(nullptr) { _pdu["action"] = "rtm/publish"; - - _webSocket.setOnMessageCallback( - [](ix::WebSocketMessageType messageType, - const std::string& str, - const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) - { - ; - } - ); + resetOnMessageCallback(); } SatoriConnection::~SatoriConnection() @@ -74,43 +64,42 @@ namespace ix } } - void SatoriConnection::setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback) + void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback) { - _authenticatedCallback = authenticatedCallback; + _onEventCallback = onEventCallback; } - void SatoriConnection::invokeAuthenticatedCallback() + void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType, + const std::string& errorMsg, + const WebSocketHttpHeaders& headers) { - - if (_authenticatedCallback) + if (_onEventCallback) { - _authenticatedCallback(); + _onEventCallback(eventType, errorMsg, headers); } } - void SatoriConnection::setErrorCallback(const ErrorCallback& errorCallback) - { - _errorCallback = errorCallback; - } - void SatoriConnection::invokeErrorCallback(const std::string& errorMsg) { - - if (_errorCallback) - { - _errorCallback(errorMsg); - } + invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg); } void SatoriConnection::disconnect() { _webSocket.stop(); + resetOnMessageCallback(); + } + + void SatoriConnection::resetOnMessageCallback() + { _webSocket.setOnMessageCallback( - [](ix::WebSocketMessageType messageType, - const std::string& str, - const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + [](ix::WebSocketMessageType, + const std::string&, + size_t, + const ix::WebSocketErrorInfo&, + const ix::WebSocketCloseInfo&, + const ix::WebSocketHttpHeaders&) { ; } @@ -120,7 +109,8 @@ namespace ix void SatoriConnection::configure(const std::string& appkey, const std::string& endpoint, const std::string& rolename, - const std::string& rolesecret) + const std::string& rolesecret, + WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions) { _appkey = appkey; _endpoint = endpoint; @@ -132,22 +122,37 @@ namespace ix ss << "/v2?appkey="; ss << appkey; - _webSocket.configure(ss.str()); + std::string url = ss.str(); + _webSocket.setUrl(url); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket.setOnMessageCallback( [this](ix::WebSocketMessageType messageType, const std::string& str, + size_t wireSize, const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { + SatoriConnection::invokeTrafficTrackerCallback(wireSize, true); + std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) { + invokeOnEventCallback(ix::SatoriConnection_EventType_Open, + std::string(), + headers); sendHandshakeMessage(); } else if (messageType == ix::WebSocket_MessageType_Close) { _authenticated = false; + + std::stringstream ss; + ss << "Close code " << closeInfo.code; + ss << " reason " << closeInfo.reason; + invokeOnEventCallback(ix::SatoriConnection_EventType_Closed, + ss.str()); } else if (messageType == ix::WebSocket_MessageType_Message) { @@ -180,7 +185,7 @@ namespace ix else if (action == "auth/authenticate/ok") { _authenticated = true; - invokeAuthenticatedCallback(); + invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated); flushQueue(); } else if (action == "auth/authenticate/error") @@ -238,7 +243,7 @@ namespace ix std::string serializedJson = writeJsonCompact(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); - return _webSocket.send(serializedJson); + return _webSocket.send(serializedJson).success; } // @@ -300,7 +305,7 @@ namespace ix std::string serializedJson = writeJsonCompact(pdu); SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); - return _webSocket.send(serializedJson); + return _webSocket.send(serializedJson).success; } @@ -455,8 +460,10 @@ namespace ix bool SatoriConnection::publishMessage(const std::string& serializedJson) { - SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); - return _webSocket.send(serializedJson); + auto webSocketSendInfo = _webSocket.send(serializedJson); + SatoriConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, + false); + return webSocketSendInfo.success; } } // namespace ix diff --git a/examples/satori_publisher/IXSatoriConnection.h b/examples/satori_publisher/IXSatoriConnection.h index 294985c4..4041d305 100644 --- a/examples/satori_publisher/IXSatoriConnection.h +++ b/examples/satori_publisher/IXSatoriConnection.h @@ -14,12 +14,22 @@ #include "jsoncpp/json/json.h" #include +#include namespace ix { + enum SatoriConnectionEventType + { + SatoriConnection_EventType_Authenticated = 0, + SatoriConnection_EventType_Error = 1, + SatoriConnection_EventType_Open = 2, + SatoriConnection_EventType_Closed = 3 + }; + using SubscriptionCallback = std::function; - using AuthenticatedCallback = std::function; - using ErrorCallback = std::function; + using OnEventCallback = std::function; using OnTrafficTrackerCallback = std::function; class SatoriConnection @@ -33,7 +43,8 @@ namespace ix void configure(const std::string& appkey, const std::string& endpoint, const std::string& rolename, - const std::string& rolesecret); + const std::string& rolesecret, + WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions); /// Set the traffic tracker callback static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); @@ -41,10 +52,8 @@ namespace ix /// Reset the traffic tracker callback to an no-op one. static void resetTrafficTrackerCallback(); - /// Set the authenticated callback - void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback); - /// Set the error callback - void setErrorCallback(const ErrorCallback& errorCallback); + /// Set the closed callback + void setOnEventCallback(const OnEventCallback& onEventCallback); /// Start the worker thread, used for background publishing void start(); @@ -77,6 +86,8 @@ namespace ix bool sendAuthMessage(const std::string& nonce); bool handleSubscriptionData(const Json::Value& pdu); + void resetOnMessageCallback(); + bool publishMessage(const std::string& serializedJson); bool flushQueue(); void enqueue(const std::string& msg); @@ -84,8 +95,10 @@ namespace ix /// Invoke the traffic tracker callback static void invokeTrafficTrackerCallback(size_t size, bool incoming); - /// Invoke lifecycle callbacks - void invokeAuthenticatedCallback(); + /// Invoke event callbacks + void invokeOnEventCallback(SatoriConnectionEventType eventType, + const std::string& errorMsg = std::string(), + const WebSocketHttpHeaders& headers = WebSocketHttpHeaders()); void invokeErrorCallback(const std::string& errorMsg); /// @@ -98,7 +111,6 @@ namespace ix std::string _endpoint; std::string _role_name; std::string _role_secret; - uint32_t _history; // Can be set on control+background thread, protecting with an atomic std::atomic _authenticated; @@ -111,8 +123,7 @@ namespace ix static OnTrafficTrackerCallback _onTrafficTrackerCallback; /// Callbacks - AuthenticatedCallback _authenticatedCallback; - ErrorCallback _errorCallback; + OnEventCallback _onEventCallback; /// Subscription callbacks, only one per channel std::unordered_map _cbs; diff --git a/examples/satori_publisher/base64.cpp b/examples/satori_publisher/base64.cpp index 51b8867a..a0c02e68 100644 --- a/examples/satori_publisher/base64.cpp +++ b/examples/satori_publisher/base64.cpp @@ -34,12 +34,7 @@ namespace ix "abcdefghijklmnopqrstuvwxyz" "0123456789+/"; - static inline bool is_base64(unsigned char c) - { - return (isalnum(c) || (c == '+') || (c == '/')); - } - - std::string base64_encode(const std::string& data, uint32_t len) + std::string base64_encode(const std::string& data, size_t len) { std::string ret; int i = 0; diff --git a/examples/satori_publisher/base64.h b/examples/satori_publisher/base64.h index febe9609..3c8e2660 100644 --- a/examples/satori_publisher/base64.h +++ b/examples/satori_publisher/base64.h @@ -10,5 +10,5 @@ namespace ix { - std::string base64_encode(const std::string& data, uint32_t len); + std::string base64_encode(const std::string& data, size_t len); } diff --git a/examples/satori_publisher/devnull_server.js b/examples/satori_publisher/devnull_server.js index 2fc3c774..cae6f75d 100644 --- a/examples/satori_publisher/devnull_server.js +++ b/examples/satori_publisher/devnull_server.js @@ -5,13 +5,14 @@ */ const WebSocket = require('ws'); -const wss = new WebSocket.Server({ port: 5678, perMessageDeflate: false }); +let wss = new WebSocket.Server({ port: 5678, perMessageDeflate: true }) -let handshake = false -let authenticated = false +wss.on('connection', (ws) => { -wss.on('connection', function connection(ws) { - ws.on('message', function incoming(data) { + let handshake = false + let authenticated = false + + ws.on('message', (data) => { console.log(data.toString('utf-8')) @@ -41,4 +42,4 @@ wss.on('connection', function connection(ws) { console.log(data) } }); -}); +}) diff --git a/examples/satori_publisher/satori_publisher.cpp b/examples/satori_publisher/satori_publisher.cpp index cdc71d50..7b180403 100644 --- a/examples/satori_publisher/satori_publisher.cpp +++ b/examples/satori_publisher/satori_publisher.cpp @@ -20,6 +20,11 @@ void msleep(int ms) int main(int argc, char* argv[]) { + if (argc != 7) + { + std::cerr << "Usage error: need 6 arguments." << std::endl; + } + std::string endpoint = argv[1]; std::string appkey = argv[2]; std::string channel = argv[3]; @@ -45,51 +50,72 @@ int main(int argc, char* argv[]) bool done = false; ix::SatoriConnection satoriConnection; - satoriConnection.configure(appkey, endpoint, rolename, rolesecret); + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + false, false, false, 15, 15); + satoriConnection.configure(appkey, endpoint, rolename, rolesecret, + webSocketPerMessageDeflateOptions); satoriConnection.connect(); - satoriConnection.setAuthenticatedCallback( - [&satoriConnection, channel, path, &done]() + satoriConnection.setOnEventCallback( + [&satoriConnection, channel, path, &done] + (ix::SatoriConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers) { - std::cout << "Authenticated" << std::endl;; - - std::string line; - std::ifstream f(path); - if (!f.is_open()) + if (eventType == ix::SatoriConnection_EventType_Open) { - std::cerr << "Error while opening file: " << path << std::endl; + std::cout << "Handshake Headers:" << std::endl; + for (auto it : headers) + { + std::cout << it.first << ": " << it.second << std::endl; + } } - - while (getline(f, line)) + else if (eventType == ix::SatoriConnection_EventType_Authenticated) { - Json::Value value; - Json::Reader reader; - reader.parse(line, value); + std::cout << "Authenticated" << std::endl; - satoriConnection.publish(channel, value); + std::string line; + std::ifstream f(path); + if (!f.is_open()) + { + std::cerr << "Error while opening file: " << path << std::endl; + } + + int n = 0; + while (getline(f, line)) + { + Json::Value value; + Json::Reader reader; + reader.parse(line, value); + + satoriConnection.publish(channel, value); + n++; + } + std::cerr << "#published messages: " << n << std::endl; + + if (f.bad()) + { + std::cerr << "Error while opening file: " << path << std::endl; + } + + done = true; } - - if (f.bad()) + else if (eventType == ix::SatoriConnection_EventType_Error) { - std::cerr << "Error while opening file: " << path << std::endl; + std::cerr << "Satori Error received: " << errMsg << std::endl; + done = true; + } + else if (eventType == ix::SatoriConnection_EventType_Closed) + { + std::cerr << "Satori connection closed" << std::endl; } - - done = true; - } - ); - satoriConnection.setErrorCallback( - [&done](const std::string& errMsg) - { - std::cerr << "Satori Error received: " << errMsg << std::endl; - done = true; } ); while (!done) { - msleep(10); + msleep(1); } - std::cout << incomingBytes << std::endl; std::cout << "Incoming bytes: " << incomingBytes << std::endl; std::cout << "Outgoing bytes: " << outgoingBytes << std::endl; diff --git a/examples/satori_publisher/satori_publisher.sh b/examples/satori_publisher/satori_publisher.sh index 62ebc1ef..a8917302 100644 --- a/examples/satori_publisher/satori_publisher.sh +++ b/examples/satori_publisher/satori_publisher.sh @@ -6,6 +6,6 @@ appkey="appkey" channel="foo" rolename="a_role" rolesecret="a_secret" -path=events.jsonl +filename=${FILENAME:=events.jsonl} -build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $path +build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $filename diff --git a/examples/ws_connect/.gitignore b/examples/ws_connect/.gitignore new file mode 100644 index 00000000..a18937a6 --- /dev/null +++ b/examples/ws_connect/.gitignore @@ -0,0 +1,3 @@ +build +venv +node_modules diff --git a/examples/ws_connect/ws_connect.cpp b/examples/ws_connect/ws_connect.cpp index d5c0692c..d57e0d8d 100644 --- a/examples/ws_connect/ws_connect.cpp +++ b/examples/ws_connect/ws_connect.cpp @@ -47,7 +47,7 @@ namespace void WebSocketConnect::start() { - _webSocket.configure(_url); + _webSocket.setUrl(_url); std::stringstream ss; log(std::string("Connecting to url: ") + _url); @@ -55,8 +55,10 @@ namespace _webSocket.setOnMessageCallback( [this](ix::WebSocketMessageType messageType, const std::string& str, + size_t wireSize, const ix::WebSocketErrorInfo& error, - const ix::CloseInfo& closeInfo) + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index db9b8f87..d0221d1f 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -41,12 +41,29 @@ namespace ix { stop(); } - void WebSocket::configure(const std::string& url) + void WebSocket::setUrl(const std::string& url) { - std::lock_guard lock(_urlMutex); + std::lock_guard lock(_configMutex); _url = url; } + const std::string& WebSocket::getUrl() const + { + std::lock_guard lock(_configMutex); + return _url; + } + + void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) + { + _perMessageDeflateOptions = perMessageDeflateOptions; + } + + const WebSocketPerMessageDeflateOptions& WebSocket::getPerMessageDeflateOptions() const + { + std::lock_guard lock(_configMutex); + return _perMessageDeflateOptions; + } + void WebSocket::start() { if (_thread.joinable()) return; // we've already been started @@ -76,15 +93,17 @@ namespace ix { WebSocketInitResult WebSocket::connect() { { - std::lock_guard lock(_urlMutex); - _ws.configure(_url); + std::lock_guard lock(_configMutex); + _ws.configure(_url, _perMessageDeflateOptions); } _ws.setOnCloseCallback( - [this](uint16_t code, const std::string& reason) + [this](uint16_t code, const std::string& reason, size_t wireSize) { - _onMessageCallback(WebSocket_MessageType_Close, "", - WebSocketErrorInfo(), CloseInfo(code, reason)); + _onMessageCallback(WebSocket_MessageType_Close, "", wireSize, + WebSocketErrorInfo(), + WebSocketCloseInfo(code, reason), + WebSocketHttpHeaders()); } ); @@ -94,8 +113,9 @@ namespace ix { return status; } - _onMessageCallback(WebSocket_MessageType_Open, "", - WebSocketErrorInfo(), CloseInfo()); + _onMessageCallback(WebSocket_MessageType_Open, "", 0, + WebSocketErrorInfo(), WebSocketCloseInfo(), + status.headers); return status; } @@ -139,8 +159,9 @@ namespace ix { connectErr.wait_time = duration.count(); connectErr.reason = status.errorStr; connectErr.http_status = status.http_status; - _onMessageCallback(WebSocket_MessageType_Error, "", - connectErr, CloseInfo()); + _onMessageCallback(WebSocket_MessageType_Error, "", 0, + connectErr, WebSocketCloseInfo(), + WebSocketHttpHeaders()); std::this_thread::sleep_for(duration); } @@ -166,6 +187,7 @@ namespace ix { // 3. Dispatch the incoming messages _ws.dispatch( [this](const std::string& msg, + size_t wireSize, WebSocketTransport::MessageKind messageKind) { WebSocketMessageType webSocketMessageType; @@ -187,8 +209,9 @@ namespace ix { } break; } - _onMessageCallback(webSocketMessageType, msg, - WebSocketErrorInfo(), CloseInfo()); + _onMessageCallback(webSocketMessageType, msg, wireSize, + WebSocketErrorInfo(), WebSocketCloseInfo(), + WebSocketHttpHeaders()); WebSocket::invokeTrafficTrackerCallback(msg.size(), true); }); @@ -218,23 +241,23 @@ namespace ix { } } - bool WebSocket::send(const std::string& text) + WebSocketSendInfo WebSocket::send(const std::string& text) { return sendMessage(text, false); } - bool WebSocket::ping(const std::string& text) + WebSocketSendInfo WebSocket::ping(const std::string& text) { // Standard limit ping message size constexpr size_t pingMaxPayloadSize = 125; - if (text.size() > pingMaxPayloadSize) return false; + if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false); return sendMessage(text, true); } - bool WebSocket::sendMessage(const std::string& text, bool ping) + WebSocketSendInfo WebSocket::sendMessage(const std::string& text, bool ping) { - if (!isConnected()) return false; + if (!isConnected()) return WebSocketSendInfo(false); // // It is OK to read and write on the same socket in 2 different threads. @@ -246,19 +269,20 @@ namespace ix { // incoming messages are arriving / there's data to be received. // std::lock_guard lock(_writeMutex); + WebSocketSendInfo webSocketSendInfo; if (ping) { - _ws.sendPing(text); + webSocketSendInfo = _ws.sendPing(text); } else { - _ws.sendBinary(text); + webSocketSendInfo = _ws.sendBinary(text); } - WebSocket::invokeTrafficTrackerCallback(text.size(), false); + WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false); - return true; + return webSocketSendInfo; } ReadyState WebSocket::getReadyState() const @@ -282,10 +306,4 @@ namespace ix { case WebSocket_ReadyState_Closed: return "CLOSED"; } } - - const std::string& WebSocket::getUrl() const - { - std::lock_guard lock(_urlMutex); - return _url; - } } diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 39847382..0bd6275f 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -15,8 +15,11 @@ #include #include "IXWebSocketTransport.h" +#include "IXWebSocketSendInfo.h" +#include "IXWebSocketPerMessageDeflateOptions.h" +#include "IXWebSocketHttpHeaders.h" -namespace ix +namespace ix { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket#Ready_state_constants enum ReadyState @@ -45,18 +48,18 @@ namespace ix std::string reason; }; - struct CloseInfo + struct WebSocketCloseInfo { uint16_t code; std::string reason; - CloseInfo(uint64_t c, const std::string& r) + WebSocketCloseInfo(uint64_t c, const std::string& r) { code = c; reason = r; } - CloseInfo() + WebSocketCloseInfo() { code = 0; reason = ""; @@ -65,8 +68,10 @@ namespace ix using OnMessageCallback = std::function; + size_t wireSize, + const WebSocketErrorInfo&, + const WebSocketCloseInfo&, + const WebSocketHttpHeaders&)>; using OnTrafficTrackerCallback = std::function; class WebSocket @@ -75,24 +80,27 @@ namespace ix WebSocket(); ~WebSocket(); - void configure(const std::string& url); + void setUrl(const std::string& url); + void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); + void start(); void stop(); - bool send(const std::string& text); - bool ping(const std::string& text); + WebSocketSendInfo send(const std::string& text); + WebSocketSendInfo ping(const std::string& text); void close(); void setOnMessageCallback(const OnMessageCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); - const std::string& getUrl() const; ReadyState getReadyState() const; + const std::string& getUrl() const; + const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; private: void run(); - bool sendMessage(const std::string& text, bool ping); + WebSocketSendInfo sendMessage(const std::string& text, bool ping); WebSocketInitResult connect(); bool isConnected() const; @@ -104,7 +112,8 @@ namespace ix WebSocketTransport _ws; std::string _url; - mutable std::mutex _urlMutex; + WebSocketPerMessageDeflateOptions _perMessageDeflateOptions; + mutable std::mutex _configMutex; // protect all config variables access OnMessageCallback _onMessageCallback; static OnTrafficTrackerCallback _onTrafficTrackerCallback; diff --git a/ixwebsocket/IXWebSocketHttpHeaders.h b/ixwebsocket/IXWebSocketHttpHeaders.h new file mode 100644 index 00000000..97570950 --- /dev/null +++ b/ixwebsocket/IXWebSocketHttpHeaders.h @@ -0,0 +1,15 @@ +/* + * IXWebSocketHttpHeaders.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include + +namespace ix +{ + using WebSocketHttpHeaders = std::unordered_map; +} diff --git a/ixwebsocket/IXWebSocketPerMessageDeflate.cpp b/ixwebsocket/IXWebSocketPerMessageDeflate.cpp new file mode 100644 index 00000000..3452387c --- /dev/null +++ b/ixwebsocket/IXWebSocketPerMessageDeflate.cpp @@ -0,0 +1,222 @@ +/* + * IXWebSocketPerMessageDeflate.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + * + * Per message Deflate RFC: https://tools.ietf.org/html/rfc7692 + * + * Chrome websocket -> https://github.com/chromium/chromium/tree/2ca8c5037021c9d2ecc00b787d58a31ed8fc8bcb/net/websockets + */ + +#include "IXWebSocketPerMessageDeflate.h" +#include "IXWebSocketPerMessageDeflateOptions.h" + +#include +#include + +namespace +{ + // The passed in size (4) is important, without it the string litteral + // is treated as a char* and the null termination (\x00) makes it + // look like an empty string. + const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4); + + const int kBufferSize = 1 << 14; +} + +namespace ix +{ + // + // Compressor + // + WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() + : _compressBufferSize(kBufferSize) + { + _deflateState.zalloc = Z_NULL; + _deflateState.zfree = Z_NULL; + _deflateState.opaque = Z_NULL; + } + + WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor() + { + deflateEnd(&_deflateState); + } + + bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits, + bool client_no_context_takeover) + { + int ret = deflateInit2( + &_deflateState, + Z_DEFAULT_COMPRESSION, + Z_DEFLATED, + -1*deflateBits, + 4, // memory level 1-9 + Z_DEFAULT_STRATEGY + ); + + if (ret != Z_OK) return false; + + _compressBuffer.reset(new unsigned char[_compressBufferSize]); + if (client_no_context_takeover) + { + _flush = Z_FULL_FLUSH; + } + else + { + _flush = Z_SYNC_FLUSH; + } + + return true; + } + + bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value, + const std::string& ending) + { + if (ending.size() > value.size()) return false; + return std::equal(ending.rbegin(), ending.rend(), value.rbegin()); + } + + bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, + std::string& out) + { + size_t output; + + if (in.empty()) + { + uint8_t buf[6] = {0x02, 0x00, 0x00, 0x00, 0xff, 0xff}; + out.append((char *)(buf), 6); + return true; + } + + _deflateState.avail_in = (uInt) in.size(); + _deflateState.next_in = (Bytef*) in.data(); + + do + { + // Output to local buffer + _deflateState.avail_out = (uInt) _compressBufferSize; + _deflateState.next_out = _compressBuffer.get(); + + deflate(&_deflateState, _flush); + + output = _compressBufferSize - _deflateState.avail_out; + + out.append((char *)(_compressBuffer.get()),output); + } while (_deflateState.avail_out == 0); + + if (endsWith(out, kEmptyUncompressedBlock)) + { + out.resize(out.size() - 4); + } + + return true; + } + + // + // Decompressor + // + WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() + : _compressBufferSize(kBufferSize) + { + _inflateState.zalloc = Z_NULL; + _inflateState.zfree = Z_NULL; + _inflateState.opaque = Z_NULL; + _inflateState.avail_in = 0; + _inflateState.next_in = Z_NULL; + } + + WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor() + { + inflateEnd(&_inflateState); + } + + bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits, + bool client_no_context_takeover) + { + int ret = inflateInit2( + &_inflateState, + -1*inflateBits + ); + + if (ret != Z_OK) return false; + + _compressBuffer.reset(new unsigned char[_compressBufferSize]); + if (client_no_context_takeover) + { + _flush = Z_FULL_FLUSH; + } + else + { + _flush = Z_SYNC_FLUSH; + } + + return true; + } + + bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, + std::string& out) + { + std::string inFixed(in); + inFixed += kEmptyUncompressedBlock; + + _inflateState.avail_in = (uInt) inFixed.size(); + _inflateState.next_in = (unsigned char *)(const_cast(inFixed.data())); + + do + { + _inflateState.avail_out = (uInt) _compressBufferSize; + _inflateState.next_out = _compressBuffer.get(); + + int ret = inflate(&_inflateState, Z_SYNC_FLUSH); + + if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) + { + return false; // zlib error + } + + out.append( + reinterpret_cast(_compressBuffer.get()), + _compressBufferSize - _inflateState.avail_out + ); + } while (_inflateState.avail_out == 0); + + return true; + } + + WebSocketPerMessageDeflate::WebSocketPerMessageDeflate() + { + _compressor.reset(new WebSocketPerMessageDeflateCompressor()); + _decompressor.reset(new WebSocketPerMessageDeflateDecompressor()); + } + + WebSocketPerMessageDeflate::~WebSocketPerMessageDeflate() + { + _compressor.reset(); + _decompressor.reset(); + } + + bool WebSocketPerMessageDeflate::init(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) + { + bool clientNoContextTakeover = + perMessageDeflateOptions.getClientNoContextTakeover(); + + uint8_t deflateBits = perMessageDeflateOptions.getClientMaxWindowBits(); + uint8_t inflateBits = perMessageDeflateOptions.getServerMaxWindowBits(); + + return _compressor->init(deflateBits, clientNoContextTakeover) && + _decompressor->init(inflateBits, clientNoContextTakeover); + } + + bool WebSocketPerMessageDeflate::compress(const std::string& in, + std::string& out) + { + return _compressor->compress(in, out); + } + + bool WebSocketPerMessageDeflate::decompress(const std::string& in, + std::string &out) + { + return _decompressor->decompress(in, out); + } + +} diff --git a/ixwebsocket/IXWebSocketPerMessageDeflate.h b/ixwebsocket/IXWebSocketPerMessageDeflate.h new file mode 100644 index 00000000..cbc56022 --- /dev/null +++ b/ixwebsocket/IXWebSocketPerMessageDeflate.h @@ -0,0 +1,68 @@ +/* + * IXWebSocketPerMessageDeflate.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include "zlib.h" +#include + +namespace ix +{ + class WebSocketPerMessageDeflateOptions; + + class WebSocketPerMessageDeflateCompressor + { + public: + WebSocketPerMessageDeflateCompressor(); + ~WebSocketPerMessageDeflateCompressor(); + bool init(uint8_t deflate_bits, bool client_no_context_takeover); + bool compress(const std::string& in, std::string& out); + + private: + static bool endsWith(const std::string& value, const std::string& ending); + + int _flush; + size_t _compressBufferSize; + std::unique_ptr _compressBuffer; + + z_stream _deflateState; + }; + + class WebSocketPerMessageDeflateDecompressor + { + public: + WebSocketPerMessageDeflateDecompressor(); + ~WebSocketPerMessageDeflateDecompressor(); + bool init(uint8_t inflate_bits, bool client_no_context_takeover); + bool decompress(const std::string& in, std::string& out); + + private: + int _flush; + size_t _compressBufferSize; + std::unique_ptr _compressBuffer; + + z_stream _inflateState; + }; + + class WebSocketPerMessageDeflate + { + public: + WebSocketPerMessageDeflate(); + virtual ~WebSocketPerMessageDeflate(); + + bool init(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); + + bool compress(const std::string& in, std::string& out); + bool decompress(const std::string& in, std::string& out); + + private: + // mode::value m_server_max_window_bits_mode; + // mode::value m_client_max_window_bits_mode; + + std::shared_ptr _compressor; + std::shared_ptr _decompressor; + }; +} diff --git a/ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp b/ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp new file mode 100644 index 00000000..9dd00fac --- /dev/null +++ b/ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp @@ -0,0 +1,171 @@ +/* + * IXWebSocketPerMessageDeflateOptions.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXWebSocketPerMessageDeflateOptions.h" + +#include +#include +#include + +namespace ix +{ + /// Default values as defined in the RFC + const uint8_t WebSocketPerMessageDeflateOptions::kDefaultServerMaxWindowBits = 15; + static const int minServerMaxWindowBits = 8; + static const int maxServerMaxWindowBits = 15; + + const uint8_t WebSocketPerMessageDeflateOptions::kDefaultClientMaxWindowBits = 15; + static const int minClientMaxWindowBits = 8; + static const int maxClientMaxWindowBits = 15; + + WebSocketPerMessageDeflateOptions::WebSocketPerMessageDeflateOptions( + bool enabled, + bool clientNoContextTakeover, + bool serverNoContextTakeover, + uint8_t clientMaxWindowBits, + uint8_t serverMaxWindowBits) + { + _enabled = enabled; + _clientNoContextTakeover = clientNoContextTakeover; + _serverNoContextTakeover = serverNoContextTakeover; + _clientMaxWindowBits = clientMaxWindowBits; + _serverMaxWindowBits = serverMaxWindowBits; + } + + // + // Four extension parameters are defined for "permessage-deflate" to + // help endpoints manage per-connection resource usage. + // + // - "server_no_context_takeover" + // - "client_no_context_takeover" + // - "server_max_window_bits" + // - "client_max_window_bits" + // + // Server response could look like that: + // + // Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover; server_no_context_takeover + // + WebSocketPerMessageDeflateOptions::WebSocketPerMessageDeflateOptions(std::string extension) + { + extension = removeSpaces(extension); + + _enabled = false; + _clientNoContextTakeover = false; + _serverNoContextTakeover = false; + _clientMaxWindowBits = kDefaultClientMaxWindowBits; + _serverMaxWindowBits = kDefaultServerMaxWindowBits; + + // Split by ; + std::string token; + std::stringstream tokenStream(extension); + + while (std::getline(tokenStream, token, ';')) + { + if (token == "permessage-deflate") + { + _enabled = true; + } + + if (token == "server_no_context_takeover") + { + _serverNoContextTakeover = true; + } + + if (token == "client_no_context_takeover") + { + _clientNoContextTakeover = true; + } + + if (startsWith(token, "server_max_window_bits=")) + { + std::string val = token.substr(token.find_last_of("=") + 1); + std::stringstream ss; + ss << val; + int x; + ss >> x; + + // Sanitize values to be in the proper range [8, 15] in + // case a server would give us bogus values + _serverMaxWindowBits = + std::min(maxServerMaxWindowBits, + std::max(x, minServerMaxWindowBits)); + } + + if (startsWith(token, "client_max_window_bits=")) + { + std::string val = token.substr(token.find_last_of("=") + 1); + std::stringstream ss; + ss << val; + int x; + ss >> x; + + // Sanitize values to be in the proper range [8, 15] in + // case a server would give us bogus values + _clientMaxWindowBits = + std::min(maxClientMaxWindowBits, + std::max(x, minClientMaxWindowBits)); + } + } + } + + std::string WebSocketPerMessageDeflateOptions::generateHeader() + { + std::stringstream ss; + ss << "Sec-WebSocket-Extensions: permessage-deflate"; + + if (_clientNoContextTakeover) ss << "; client_no_context_takeover"; + if (_serverNoContextTakeover) ss << "; server_no_context_takeover"; + + ss << "; server_max_window_bits=" << _serverMaxWindowBits; + ss << "; client_max_window_bits=" << _clientMaxWindowBits; + + ss << "\r\n"; + + return ss.str(); + } + + bool WebSocketPerMessageDeflateOptions::enabled() const + { + return _enabled; + } + + bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const + { + return _clientNoContextTakeover; + } + + bool WebSocketPerMessageDeflateOptions::getServerNoContextTakeover() const + { + return _serverNoContextTakeover; + } + + uint8_t WebSocketPerMessageDeflateOptions::getClientMaxWindowBits() const + { + return _clientMaxWindowBits; + } + + uint8_t WebSocketPerMessageDeflateOptions::getServerMaxWindowBits() const + { + return _serverMaxWindowBits; + } + + bool WebSocketPerMessageDeflateOptions::startsWith(const std::string& str, + const std::string& start) + { + return str.compare(0, start.length(), start) == 0; + } + + std::string WebSocketPerMessageDeflateOptions::removeSpaces(const std::string& str) + { + std::string out(str); + out.erase(std::remove_if(out.begin(), + out.end(), + [](unsigned char x){ return std::isspace(x); }), + out.end()); + + return out; + } +} diff --git a/ixwebsocket/IXWebSocketPerMessageDeflateOptions.h b/ixwebsocket/IXWebSocketPerMessageDeflateOptions.h new file mode 100644 index 00000000..5caceead --- /dev/null +++ b/ixwebsocket/IXWebSocketPerMessageDeflateOptions.h @@ -0,0 +1,46 @@ +/* + * IXWebSocketPerMessageDeflateOptions.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include + +namespace ix +{ + class WebSocketPerMessageDeflateOptions + { + public: + WebSocketPerMessageDeflateOptions( + bool enabled = true, + bool clientNoContextTakeover = false, + bool serverNoContextTakeover = false, + uint8_t clientMaxWindowBits = kDefaultClientMaxWindowBits, + uint8_t serverMaxWindowBits = kDefaultServerMaxWindowBits); + + WebSocketPerMessageDeflateOptions(std::string extension); + + std::string generateHeader(); + std::string parseHeader(); + bool enabled() const; + bool getClientNoContextTakeover() const; + bool getServerNoContextTakeover() const; + uint8_t getServerMaxWindowBits() const; + uint8_t getClientMaxWindowBits() const; + + static bool startsWith(const std::string& str, const std::string& start); + static std::string removeSpaces(const std::string& str); + + private: + bool _enabled; + bool _clientNoContextTakeover; + bool _serverNoContextTakeover; + int _clientMaxWindowBits; + int _serverMaxWindowBits; + + static uint8_t const kDefaultClientMaxWindowBits; + static uint8_t const kDefaultServerMaxWindowBits; + }; +} diff --git a/ixwebsocket/IXWebSocketSendInfo.h b/ixwebsocket/IXWebSocketSendInfo.h new file mode 100644 index 00000000..b7d05a5b --- /dev/null +++ b/ixwebsocket/IXWebSocketSendInfo.h @@ -0,0 +1,26 @@ +/* + * IXWebSocketSendInfo.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include + +namespace ix +{ + struct WebSocketSendInfo + { + bool success; + size_t payloadSize; + size_t wireSize; + + WebSocketSendInfo(bool s = false, size_t p = -1, size_t w = -1) + { + success = s; + payloadSize = p; + wireSize = w; + } + }; +} diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 92b4ce41..15cd28b8 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -9,6 +9,7 @@ // #include "IXWebSocketTransport.h" +#include "IXWebSocketHttpHeaders.h" #include "IXSocket.h" #ifdef IXWEBSOCKET_USE_TLS @@ -31,18 +32,17 @@ #include #include #include -#include #include #include -namespace ix { - +namespace ix +{ WebSocketTransport::WebSocketTransport() : _readyState(CLOSED), _enablePerMessageDeflate(false) { - _perMessageDeflate.init(); + } WebSocketTransport::~WebSocketTransport() @@ -50,9 +50,12 @@ namespace ix { ; } - void WebSocketTransport::configure(const std::string& url) + void WebSocketTransport::configure(const std::string& url, + const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) { _url = url; + _perMessageDeflateOptions = perMessageDeflateOptions; + _enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); } bool WebSocketTransport::parseUrl(const std::string& url, @@ -135,21 +138,22 @@ namespace ix { std::string WebSocketTransport::genRandomString(const int len) { - static const char alphanum[] = + std::string alphanum = "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; + "ABCDEFGH" + "abcdefgh"; std::random_device r; std::default_random_engine e1(r()); - std::uniform_int_distribution dist(0, sizeof(alphanum) - 1); + std::uniform_int_distribution dist(0, (int) alphanum.size() - 1); std::string s; s.resize(len); for (int i = 0; i < len; ++i) { - s[i] += alphanum[dist(e1)]; + int x = dist(e1); + s[i] = alphanum[x]; } return s; @@ -206,35 +210,31 @@ namespace ix { std::string secWebSocketKey = genRandomString(22); secWebSocketKey += "=="; - std::string extensions; + std::stringstream ss; + ss << "GET " << path << " HTTP/1.1\r\n"; + ss << "Host: "<< host << ":" << port << "\r\n"; + ss << "Upgrade: websocket\r\n"; + ss << "Connection: Upgrade\r\n"; + ss << "Sec-WebSocket-Version: 13\r\n"; + ss << "Sec-WebSocket-Key: " << secWebSocketKey << "\r\n"; + if (_enablePerMessageDeflate) { - // extensions = "Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover; server_no_context_takeover\r\n"; - extensions = "Sec-WebSocket-Extensions: permessage-deflate\r\n"; + ss << _perMessageDeflateOptions.generateHeader(); } - char line[512]; - int status; - int i; - snprintf(line, 512, - "GET %s HTTP/1.1\r\n" - "Host: %s:%d\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Key: %s\r\n" - "Sec-WebSocket-Version: 13\r\n" - "%s" - "\r\n", - path.c_str(), host.c_str(), port, - secWebSocketKey.c_str(), extensions.c_str()); + ss << "\r\n"; - size_t lineSize = strlen(line); - if (_socket->send(line, lineSize) != lineSize) + std::string request = ss.str(); + int requestSize = (int) request.size(); + if (_socket->send(const_cast(request.c_str()), requestSize) != requestSize) { return WebSocketInitResult(false, 0, std::string("Failed sending GET request to ") + _url); } + char line[512]; + int i; for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i) { if (_socket->recv(line+i, 1) == 0) @@ -248,6 +248,9 @@ namespace ix { return WebSocketInitResult(false, 0, std::string("Got bad status line connecting to ") + _url); } + // Validate status + int status; + // HTTP/1.0 is too old. if (sscanf(line, "HTTP/1.0 %d", &status) == 1) { @@ -268,7 +271,7 @@ namespace ix { return WebSocketInitResult(false, status, ss.str()); } - std::unordered_map headers; + WebSocketHttpHeaders headers; while (true) { @@ -310,7 +313,6 @@ namespace ix { std::transform(name.begin(), name.end(), name.begin(), ::tolower); headers[name] = value; - std::cout << name << " -> " << value << std::endl; } } @@ -322,10 +324,29 @@ namespace ix { return WebSocketInitResult(false, status, errorMsg); } + if (_enablePerMessageDeflate) + { + // Parse the server response. Does it support deflate ? + std::string header = headers["sec-websocket-extensions"]; + WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(header); + + // If the server does not support that extension, disable it. + if (!webSocketPerMessageDeflateOptions.enabled()) + { + _enablePerMessageDeflate = false; + } + + if (!_perMessageDeflate.init(webSocketPerMessageDeflateOptions)) + { + return WebSocketInitResult( + false, 0,"Failed to initialize per message deflate engine"); + } + } + _socket->configure(); setReadyState(OPEN); - return WebSocketInitResult(true, status, ""); + return WebSocketInitResult(true, status, "", headers); } WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const @@ -341,7 +362,7 @@ namespace ix { if (readyStateValue == CLOSED) { std::lock_guard lock(_closeDataMutex); - _onCloseCallback(_closeCode, _closeReason); + _onCloseCallback(_closeCode, _closeReason, _closeWireSize); _closeCode = 0; _closeReason = std::string(); } @@ -546,33 +567,7 @@ namespace ix { std::string stringMessage(_receivedData.begin(), _receivedData.end()); - std::cout << "raw msg: " << stringMessage << std::endl; - std::cout << "raw msg size: " << stringMessage.size() << std::endl; - - // ws.rsv1 means the message is compressed - // FIXME hack hack - std::string decompressedMessage; - - if (_enablePerMessageDeflate && ws.rsv1) - { - if (_perMessageDeflate.decompress(stringMessage, - decompressedMessage)) - { - std::cout << "decompressed msg: " << decompressedMessage << std::endl; - std::cout << "msg size: " << decompressedMessage.size() << std::endl; - onMessageCallback(decompressedMessage, MSG); - } - else - { - std::cout << "error decompressing msg !"<< std::endl; - - } - } - else - { - onMessageCallback(stringMessage, MSG); - } - + emitMessage(MSG, stringMessage, ws, onMessageCallback); _receivedData.clear(); } } @@ -583,10 +578,9 @@ namespace ix { _rxbuf.begin()+ws.header_size + (size_t) ws.N); // Reply back right away - sendData(wsheader_type::PONG, pingData.size(), - pingData.begin(), pingData.end()); + sendData(wsheader_type::PONG, pingData); - onMessageCallback(pingData, PING); + emitMessage(PING, pingData, ws, onMessageCallback); } else if (ws.opcode == wsheader_type::PONG) { @@ -594,7 +588,7 @@ namespace ix { std::string pongData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); - onMessageCallback(pongData, PONG); + emitMessage(PONG, pongData, ws, onMessageCallback); } else if (ws.opcode == wsheader_type::CLOSE) { @@ -613,6 +607,7 @@ namespace ix { std::lock_guard lock(_closeDataMutex); _closeCode = code; _closeReason = reason; + _closeWireSize = _rxbuf.size(); } close(); @@ -627,6 +622,32 @@ namespace ix { } } + void WebSocketTransport::emitMessage(MessageKind messageKind, + const std::string& message, + const wsheader_type& ws, + const OnMessageCallback& onMessageCallback) + { + // ws.rsv1 means the message is compressed + std::string decompressedMessage; + + if (_enablePerMessageDeflate && ws.rsv1) + { + if (_perMessageDeflate.decompress(message, decompressedMessage)) + { + onMessageCallback(decompressedMessage, decompressedMessage.size(), + messageKind); + } + else + { + std::cerr << "error decompressing msg !"<< std::endl; + } + } + else + { + onMessageCallback(message, message.size(), messageKind); + } + } + unsigned WebSocketTransport::getRandomUnsigned() { auto now = std::chrono::system_clock::now(); @@ -636,16 +657,32 @@ namespace ix { return static_cast(seconds); } - void WebSocketTransport::sendData(wsheader_type::opcode_type type, - uint64_t message_size, - std::string::const_iterator message_begin, - std::string::const_iterator message_end) + WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type, + const std::string& message) { if (_readyState == CLOSING || _readyState == CLOSED) { - return; + return WebSocketSendInfo(); } + size_t payloadSize = message.size(); + size_t wireSize = message.size(); + std::string compressedMessage; + + std::string::const_iterator message_begin = message.begin(); + std::string::const_iterator message_end = message.end(); + + if (_enablePerMessageDeflate) + { + _perMessageDeflate.compress(message, compressedMessage); + wireSize = compressedMessage.size(); + + message_begin = compressedMessage.begin(); + message_end = compressedMessage.end(); + } + + uint64_t message_size = wireSize; + unsigned x = getRandomUnsigned(); uint8_t masking_key[4] = {}; masking_key[0] = (x >> 24); @@ -709,36 +746,18 @@ namespace ix { // Now actually send this data sendOnSocket(); + + return WebSocketSendInfo(true, payloadSize, wireSize); } - void WebSocketTransport::sendPing(const std::string& message) + WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message) { - sendData(wsheader_type::PING, message.size(), message.begin(), message.end()); + return sendData(wsheader_type::PING, message); } - void WebSocketTransport::sendBinary(const std::string& message) + WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message) { - if (_enablePerMessageDeflate) - { - // FIXME hack hack - std::string compressedMessage; - _perMessageDeflate.compress(message, compressedMessage); - std::cout << "uncompressedMessage " << message << std::endl; - std::cout << "uncompressedMessage.size() " << message.size() << std::endl; - std::cout << "compressedMessage.size() " << compressedMessage.size() - << std::endl; - - // sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end()); - sendData(wsheader_type::BINARY_FRAME, - compressedMessage.size(), - compressedMessage.begin(), - compressedMessage.end()); - } - else - { - sendData(wsheader_type::BINARY_FRAME, message.size(), - message.begin(), message.end()); - } + return sendData(wsheader_type::BINARY_FRAME, message); } void WebSocketTransport::sendOnSocket() diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index dddd2db0..f19a79a1 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -17,7 +17,10 @@ #include #include +#include "IXWebSocketSendInfo.h" #include "IXWebSocketPerMessageDeflate.h" +#include "IXWebSocketPerMessageDeflateOptions.h" +#include "IXWebSocketHttpHeaders.h" namespace ix { @@ -28,12 +31,17 @@ namespace ix bool success; int http_status; std::string errorStr; + WebSocketHttpHeaders headers; - WebSocketInitResult(bool s, int h, std::string e) + WebSocketInitResult(bool s, + int status, + const std::string& e, + WebSocketHttpHeaders h = WebSocketHttpHeaders()) { success = s; - http_status = h; + http_status = status; errorStr = e; + headers = h; } // need to define a default @@ -42,6 +50,7 @@ namespace ix success = false; http_status = 0; errorStr = ""; + headers.clear(); } }; @@ -64,21 +73,22 @@ namespace ix }; using OnMessageCallback = std::function; using OnCloseCallback = std::function; + const std::string&, + size_t)>; WebSocketTransport(); ~WebSocketTransport(); - void configure(const std::string& url); + void configure(const std::string& url, + const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); WebSocketInitResult init(); void poll(); - void send(const std::string& message); - void sendBinary(const std::string& message); - void sendBinary(const std::vector& message); - void sendPing(const std::string& message); + WebSocketSendInfo sendBinary(const std::string& message); + WebSocketSendInfo sendPing(const std::string& message); void close(); ReadyStateValues getReadyState() const; void setReadyState(ReadyStateValues readyStateValue); @@ -123,20 +133,25 @@ namespace ix std::shared_ptr _socket; std::atomic _readyState; - std::atomic _enablePerMessageDeflate; OnCloseCallback _onCloseCallback; uint16_t _closeCode; std::string _closeReason; + size_t _closeWireSize; mutable std::mutex _closeDataMutex; WebSocketPerMessageDeflate _perMessageDeflate; + WebSocketPerMessageDeflateOptions _perMessageDeflateOptions; + std::atomic _enablePerMessageDeflate; void sendOnSocket(); - void sendData(wsheader_type::opcode_type type, - uint64_t message_size, - std::string::const_iterator message_begin, - std::string::const_iterator message_end); + WebSocketSendInfo sendData(wsheader_type::opcode_type type, + const std::string& message); + + void emitMessage(MessageKind messageKind, + const std::string& message, + const wsheader_type& ws, + const OnMessageCallback& onMessageCallback); bool isSendBufferEmpty() const; void appendToSendBuffer(const std::vector& header, diff --git a/ixwebsocket/libwshandshake.hpp b/ixwebsocket/libwshandshake.hpp index a2968af7..5b90dcc8 100644 --- a/ixwebsocket/libwshandshake.hpp +++ b/ixwebsocket/libwshandshake.hpp @@ -32,7 +32,7 @@ class WebSocketHandshake { template struct static_for<0, T> { - void operator()(uint32_t *a, uint32_t *hash) {} + void operator()(uint32_t * /*a*/, uint32_t * /*hash*/) {} }; template diff --git a/makefile b/makefile new file mode 100644 index 00000000..c2b068d0 --- /dev/null +++ b/makefile @@ -0,0 +1,16 @@ +# +# This makefile is just used to easily work with docker (linux build) +# +all: run + +docker: + docker build -t ws_connect:latest . + +run: docker + docker run --cap-add sys_ptrace -it ws_connect:latest bash + +build: + (cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make) + (cd examples/chat ; mkdir -p build ; cd build ; cmake .. ; make) + (cd examples/ping_pong ; mkdir -p build ; cd build ; cmake .. ; make) + (cd examples/ws_connect ; mkdir -p build ; cd build ; cmake .. ; make)