From b5b0de20830fe7c609f83a3f07cb051e94ba881b Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sat, 31 Aug 2019 16:46:44 -0700 Subject: [PATCH] +add utf-8 validation code, not hooked up properly yet +ws autobahn / Add code to test websocket client compliance with the autobahn test-suite +Ping received with a payload too large (> 125 bytes) trigger a connection closure +cobra / add tracking about published messages +cobra / publish returns a message id, that can be used when +cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system). --- DOCKER_VERSION | 2 +- docs/CHANGELOG.md | 10 ++ ixwebsocket/IXWebSocket.cpp | 62 +++++++ ixwebsocket/IXWebSocketTransport.cpp | 11 ++ ixwebsocket/IXWebSocketVersion.h | 2 +- ws/CMakeLists.txt | 1 + ws/ixcobra/IXCobraConnection.cpp | 73 +++++++-- ws/ixcobra/IXCobraConnection.h | 30 +++- .../IXCobraMetricsThreadedPublisher.cpp | 7 +- ws/ws.cpp | 7 + ws/ws.h | 2 + ws/ws_autobahn.cpp | 154 ++++++++++++++++++ ws/ws_cobra_publish.cpp | 14 ++ ws/ws_cobra_subscribe.cpp | 7 +- ws/ws_cobra_to_sentry.cpp | 7 +- ws/ws_cobra_to_statsd.cpp | 7 +- 16 files changed, 375 insertions(+), 21 deletions(-) create mode 100644 ws/ws_autobahn.cpp diff --git a/DOCKER_VERSION b/DOCKER_VERSION index 00433367..831446cb 100644 --- a/DOCKER_VERSION +++ b/DOCKER_VERSION @@ -1 +1 @@ -5.0.7 +5.1.0 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 90ebb51f..677101d2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,8 +1,18 @@ # Changelog All notable changes to this project will be documented in this file. +## [5.1.0] - 2019-08-31 + +add utf-8 validation code, not hooked up properly yet +ws autobahn / Add code to test websocket client compliance with the autobahn test-suite +Ping received with a payload too large (> 125 bytes) trigger a connection closure +cobra / add tracking about published messages +cobra / publish returns a message id, that can be used when +cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system). + ## [5.0.9] - 2019-08-30 +User-Agent header is set when not specified. New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries). ``` diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 62d782ba..9fc7fe55 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -12,6 +12,63 @@ #include #include +namespace +{ + // + // Stolen from here http://www.zedwood.com/article/cpp-is-valid-utf8-string-function + // There doesn't seem to be anything in the C++ library so far to do that. + // The closest thing is code for converting from utf-8 to utf-16 or utf-32 but + // that isn't working well for some broken input strings. + // + bool isValidUtf8(const std::string& str) + { + size_t i = 0; + size_t ix = str.length(); + int c, n, j; + + for (; i < ix; i++) + { + c = (unsigned char) str[i]; + //if (c==0x09 || c==0x0a || c==0x0d || (0x20 <= c && c <= 0x7e) ) n = 0; // is_printable_ascii + if (0x00 <= c && c <= 0x7f) + { + n = 0; // 0bbbbbbb + } + else if ((c & 0xE0) == 0xC0) + { + n = 1; // 110bbbbb + } + else if ( c==0xed && i<(ix-1) && ((unsigned char)str[i+1] & 0xa0)==0xa0) + { + return false; //U+d800 to U+dfff + } + else if ((c & 0xF0) == 0xE0) + { + n = 2; // 1110bbbb + } + else if ((c & 0xF8) == 0xF0) + { + n = 3; // 11110bbb + } + //else if (($c & 0xFC) == 0xF8) n=4; // 111110bb //byte 5, unnecessary in 4 byte UTF-8 + //else if (($c & 0xFE) == 0xFC) n=5; // 1111110b //byte 6, unnecessary in 4 byte UTF-8 + else + { + return false; + } + + for (j=0; j 125) + { + std::string reason("reason control frame with payload length > 125 octets"); + // Unexpected frame type + close(1002, + reason, + reason.size()); + return; + } + if (_enablePong) { // Reply back right away diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 437ccc6e..d9644be3 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "5.0.9" +#define IX_WEBSOCKET_VERSION "5.1.0" diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index b75810ea..184306cc 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -70,6 +70,7 @@ add_executable(ws ws_cobra_to_sentry.cpp ws_snake.cpp ws_httpd.cpp + ws_autobahn.cpp ws.cpp) target_link_libraries(ws ixwebsocket) diff --git a/ws/ixcobra/IXCobraConnection.cpp b/ws/ixcobra/IXCobraConnection.cpp index 681d55e3..3b3418d4 100644 --- a/ws/ixcobra/IXCobraConnection.cpp +++ b/ws/ixcobra/IXCobraConnection.cpp @@ -18,6 +18,7 @@ namespace ix { TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr; + PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; constexpr size_t CobraConnection::kQueueMaxSize; CobraConnection::CobraConnection() : @@ -56,6 +57,24 @@ namespace ix } } + void CobraConnection::setPublishTrackerCallback(const PublishTrackerCallback& callback) + { + _publishTrackerCallback = callback; + } + + void CobraConnection::resetPublishTrackerCallback() + { + setPublishTrackerCallback(nullptr); + } + + void CobraConnection::invokePublishTrackerCallback(bool sent, bool acked) + { + if (_publishTrackerCallback) + { + _publishTrackerCallback(sent, acked); + } + } + void CobraConnection::setEventCallback(const EventCallback& eventCallback) { std::lock_guard lock(_eventCallbackMutex); @@ -63,19 +82,20 @@ namespace ix } void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType, - const std::string& errorMsg, - const WebSocketHttpHeaders& headers, - const std::string& subscriptionId) + const std::string& errorMsg, + const WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { std::lock_guard lock(_eventCallbackMutex); if (_eventCallback) { - _eventCallback(eventType, errorMsg, headers, subscriptionId); + _eventCallback(eventType, errorMsg, headers, subscriptionId, msgId); } } void CobraConnection::invokeErrorCallback(const std::string& errorMsg, - const std::string& serializedPdu) + const std::string& serializedPdu) { std::stringstream ss; ss << errorMsg << " : received pdu => " << serializedPdu; @@ -178,6 +198,17 @@ namespace ix { invokeErrorCallback("Unsubscription error", msg->str); } + else if (action == "rtm/publish/ok") + { + if (!handlePublishResponse(data)) + { + invokeErrorCallback("Error processing publish response", msg->str); + } + } + else if (action == "rtm/publish/error") + { + invokeErrorCallback("Publish error", msg->str); + } else { invokeErrorCallback("Un-handled message type", msg->str); @@ -374,6 +405,24 @@ namespace ix return true; } + bool CobraConnection::handlePublishResponse(const Json::Value& pdu) + { + if (!pdu.isMember("id")) return false; + Json::Value id = pdu["id"]; + + if (!id.isUInt64()) return false; + + uint64_t msgId = id.asUInt64(); + + invokeEventCallback(ix::CobraConnection_EventType_Published, + std::string(), WebSocketHttpHeaders(), + std::string(), msgId); + + invokePublishTrackerCallback(false, true); + + return true; + } + bool CobraConnection::connect() { _webSocket->start(); @@ -399,9 +448,11 @@ namespace ix // // publish is not thread safe as we are trying to reuse some Json objects. // - bool CobraConnection::publish(const Json::Value& channels, - const Json::Value& msg) + CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels, + const Json::Value& msg) { + invokePublishTrackerCallback(true, false); + _body["channels"] = channels; _body["message"] = msg; _pdu["body"] = _body; @@ -412,7 +463,7 @@ namespace ix if (_publishMode == CobraConnection_PublishMode_Batch) { enqueue(serializedJson); - return true; + return _id - 1; } // @@ -421,14 +472,14 @@ namespace ix // if (_authenticated && publishMessage(serializedJson)) { - return true; + return _id - 1; } else // Or else we enqueue // Slow code path is when we haven't connected yet (startup), // or when the connection drops for some reason. { enqueue(serializedJson); - return false; + return 0; } } @@ -528,7 +579,7 @@ namespace ix { auto webSocketSendInfo = _webSocket->send(serializedJson); CobraConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, - false); + false); return webSocketSendInfo.success; } diff --git a/ws/ixcobra/IXCobraConnection.h b/ws/ixcobra/IXCobraConnection.h index 1e46f2c9..6e7857f4 100644 --- a/ws/ixcobra/IXCobraConnection.h +++ b/ws/ixcobra/IXCobraConnection.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace ix { @@ -27,7 +28,8 @@ namespace ix CobraConnection_EventType_Open = 2, CobraConnection_EventType_Closed = 3, CobraConnection_EventType_Subscribed = 4, - CobraConnection_EventType_UnSubscribed = 5 + CobraConnection_EventType_UnSubscribed = 5, + CobraConnection_EventType_Published = 6 }; enum CobraConnectionPublishMode @@ -40,12 +42,17 @@ namespace ix using EventCallback = std::function; + const std::string&, + uint64_t msgId)>; + using TrafficTrackerCallback = std::function; + using PublishTrackerCallback = std::function; class CobraConnection { public: + using MsgId = uint64_t; + CobraConnection(); ~CobraConnection(); @@ -57,11 +64,18 @@ namespace ix const std::string& rolesecret, const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions); + /// Set the traffic tracker callback static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); /// Reset the traffic tracker callback to an no-op one. static void resetTrafficTrackerCallback(); + /// Set the publish tracker callback + static void setPublishTrackerCallback(const PublishTrackerCallback& callback); + + /// Reset the publish tracker callback to an no-op one. + static void resetPublishTrackerCallback(); + /// Set the closed callback void setEventCallback(const EventCallback& eventCallback); @@ -71,7 +85,7 @@ namespace ix /// Publish a message to a channel /// /// No-op if the connection is not established - bool publish(const Json::Value& channels, const Json::Value& msg); + MsgId publish(const Json::Value& channels, const Json::Value& msg); // Subscribe to a channel, and execute a callback when an incoming // message arrives. @@ -111,6 +125,7 @@ namespace ix bool handleSubscriptionData(const Json::Value& pdu); bool handleSubscriptionResponse(const Json::Value& pdu); bool handleUnsubscriptionResponse(const Json::Value& pdu); + bool handlePublishResponse(const Json::Value& pdu); void initWebSocketOnMessageCallback(); @@ -121,11 +136,15 @@ namespace ix /// Invoke the traffic tracker callback static void invokeTrafficTrackerCallback(size_t size, bool incoming); + /// Invoke the publish tracker callback + static void invokePublishTrackerCallback(bool sent, bool acked); + /// Invoke event callbacks void invokeEventCallback(CobraConnectionEventType eventType, const std::string& errorMsg = std::string(), const WebSocketHttpHeaders& headers = WebSocketHttpHeaders(), - const std::string& subscriptionId = std::string()); + const std::string& subscriptionId = std::string(), + uint64_t msgId = std::numeric_limits::max()); void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu); /// @@ -150,6 +169,9 @@ namespace ix /// Traffic tracker callback static TrafficTrackerCallback _trafficTrackerCallback; + /// Publish tracker callback + static PublishTrackerCallback _publishTrackerCallback; + /// Cobra events callbacks EventCallback _eventCallback; mutable std::mutex _eventCallbackMutex; diff --git a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp index da12faaf..78ca5e40 100644 --- a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp +++ b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp @@ -25,7 +25,8 @@ namespace ix (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId) + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { std::stringstream ss; @@ -58,6 +59,10 @@ namespace ix { ss << "Unsubscribed through subscription id: " << subscriptionId; } + else if (eventType == ix::CobraConnection_EventType_Published) + { + ss << "Published message " << msgId << " acked"; + } ix::IXCoreLogger::Log(ss.str().c_str()); }); diff --git a/ws/ws.cpp b/ws/ws.cpp index b0a92ecf..3b4f2b95 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -226,6 +226,9 @@ int main(int argc, char** argv) httpServerApp->add_option("--port", port, "Port"); httpServerApp->add_option("--host", hostname, "Hostname"); + CLI::App* autobahnApp = app.add_subcommand("autobahn", "Test client Autobahn compliance"); + autobahnApp->add_option("--url", url, "url"); + CLI11_PARSE(app, argc, argv); // pid file handling @@ -328,6 +331,10 @@ int main(int argc, char** argv) { ret = ix::ws_httpd_main(port, hostname); } + else if (app.got_subcommand("autobahn")) + { + ret = ix::ws_autobahn_main(url); + } ix::uninitNetSystem(); return ret; diff --git a/ws/ws.h b/ws/ws.h index 1eca8ec3..3a477667 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -102,4 +102,6 @@ namespace ix const std::string& appsConfigPath); int ws_httpd_main(int port, const std::string& hostname); + + int ws_autobahn_main(const std::string& url); } // namespace ix diff --git a/ws/ws_autobahn.cpp b/ws/ws_autobahn.cpp new file mode 100644 index 00000000..7a9abd86 --- /dev/null +++ b/ws/ws_autobahn.cpp @@ -0,0 +1,154 @@ +/* + * ws_autobahn.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include +#include + + +namespace ix +{ + class AutobahnTestCase + { + public: + AutobahnTestCase(const std::string& _url); + void run(); + + private: + void log(const std::string& msg); + + std::string _url; + ix::WebSocket _webSocket; + + std::atomic _done; + }; + + AutobahnTestCase::AutobahnTestCase(const std::string& url) : + _url(url), + _done(false) + { + _webSocket.disableAutomaticReconnection(); + + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + true, false, false, 15, 15); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + } + + void AutobahnTestCase::log(const std::string& msg) + { + std::cerr << msg << std::endl; + } + + void AutobahnTestCase::run() + { + _webSocket.setUrl(_url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback( + [this](const ix::WebSocketMessagePtr& msg) + { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + log("autobahn: connected"); + std::cout << "Uri: " << msg->openInfo.uri << std::endl; + std::cout << "Handshake Headers:" << std::endl; + for (auto it : msg->openInfo.headers) + { + std::cout << it.first << ": " << it.second << std::endl; + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "autobahn: connection closed:"; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + log(ss.str()); + + _done = true; + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; + + // ss << "autobahn: received message: " + // << msg->str; + // log(ss.str()); + + _webSocket.send(msg->str, msg->binary); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + + // And error can happen, in which case the test-case is marked done + _done = true; + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + std::cerr << "Received message fragment" << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + std::cerr << "Received ping" << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + std::cerr << "Received pong" << std::endl; + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + + log("Waiting for being closed ..."); + while (!_done) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } + + _webSocket.stop(); + } + + // + // make && bench ws autobahn --url 'ws://localhost:9001/runCase?case=9&agent=ixwebsocket' && ws connect -d 'ws://localhost:9001/updateReports?agent=ixwebsocket' + // + int ws_autobahn_main(const std::string& url) + { + int N = 1; // 519; + N++; + for (int i = 1 ; i < N; ++i) + { + int caseNumber = i; + + std::stringstream ss; + ss << "ws://localhost:9001/runCase?case=" + << caseNumber + << "&agent=ixwebsocket"; + + std::string url(ss.str()); + + AutobahnTestCase testCase(url); + testCase.run(); + } + + return 0; + } +} + diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp index 62b4d85f..77541d45 100644 --- a/ws/ws_cobra_publish.cpp +++ b/ws/ws_cobra_publish.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace ix { @@ -23,6 +24,16 @@ namespace ix const std::string& path, bool stress) { + std::atomic sentMessages(0); + std::atomic ackedMessages(0); + CobraConnection::setPublishTrackerCallback( + [&sentMessages, &ackedMessages](bool sent, bool acked) + { + if (sent) sentMessages++; + if (acked) ackedMessages++; + } + ); + CobraMetricsPublisher cobraMetricsPublisher; cobraMetricsPublisher.enable(true); @@ -64,8 +75,11 @@ namespace ix // Wait a bit for the message to get a chance to be sent // there isn't any ack on publish right now so it's the best we can do + // FIXME: this comment is a lie now std::this_thread::sleep_for(std::chrono::milliseconds(100)); + spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); + return 0; } } diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index dbc275ea..278a84c1 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -57,7 +57,8 @@ namespace ix (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId) + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { @@ -96,6 +97,10 @@ namespace ix { spdlog::error("Subscriber: error {}", errMsg); } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } } ); diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index 46123bb3..bc6c8642 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -102,7 +102,8 @@ namespace ix (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId) + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { @@ -169,6 +170,10 @@ namespace ix { spdlog::error("Subscriber: error {}", errMsg); } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } } ); diff --git a/ws/ws_cobra_to_statsd.cpp b/ws/ws_cobra_to_statsd.cpp index ed0c0af1..6545a646 100644 --- a/ws/ws_cobra_to_statsd.cpp +++ b/ws/ws_cobra_to_statsd.cpp @@ -95,7 +95,8 @@ namespace ix (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId) + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { @@ -149,6 +150,10 @@ namespace ix { spdlog::error("Subscriber: error {}", errMsg); } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } } );