diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8648c0df..e88c656b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,10 @@ All changes to this project will be documented in this file. +## [10.2.5] - 2020-08-15 + +(ws) merge all ws_*.cpp files into a single one to speedup compilation + ## [10.2.4] - 2020-08-15 (socket server) in the loop accepting connections, call select without a timeout on unix to avoid busy looping, and only wake up when a new connection happens diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 8fb61ea6..f0348e98 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "10.2.4" +#define IX_WEBSOCKET_VERSION "10.2.5" diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 02c93e55..163a7974 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -46,29 +46,6 @@ add_executable(ws ../third_party/msgpack11/msgpack11.cpp ../third_party/cpp-linenoise/linenoise.cpp ${JSONCPP_SOURCES} - - ws_http_client.cpp - ws_ping_pong.cpp - ws_broadcast_server.cpp - ws_push_server.cpp - ws_echo_server.cpp - ws_echo_client.cpp - ws_chat.cpp - ws_connect.cpp - ws_transfer.cpp - ws_send.cpp - ws_receive.cpp - ws_redis_cli.cpp - ws_redis_publish.cpp - ws_redis_subscribe.cpp - ws_redis_server.cpp - ws_snake.cpp - ws_cobra_metrics_publish.cpp - ws_cobra_publish.cpp - ws_httpd.cpp - ws_autobahn.cpp - ws_sentry_minidump_upload.cpp - ws_dns_lookup.cpp ws.cpp) # library with the most dependencies come first diff --git a/ws/ws.cpp b/ws/ws.cpp index bf28ecf6..66829a1d 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -7,9 +7,14 @@ // // Main driver for websocket utilities // -#include "ws.h" +#include "IXBench.h" +#include "linenoise.hpp" +#include "nlohmann/json.hpp" +#include +#include #include +#include #include #include #include @@ -17,18 +22,37 @@ #include #include #include +#include #include +#include +#include +#include #include +#include #include +#include +#include +#include +#include #include +#include #include +#include #include +#include +#include #include +#include +#include +#include #include +#include #include #include #include #include +#include +#include #ifndef _WIN32 #include @@ -37,6 +61,10 @@ #define getpid _getpid #endif +// for convenience +using json = nlohmann::json; +using msgpack11::MsgPack; + namespace { std::pair> load(const std::string& path) @@ -62,8 +90,2630 @@ namespace auto vec = res.second; return std::make_pair(res.first, std::string(vec.begin(), vec.end())); } + + // Assume the file exists + std::string readBytes(const std::string& path) + { + std::vector memblock; + std::ifstream file(path); + + file.seekg(0, file.end); + std::streamoff size = file.tellg(); + file.seekg(0, file.beg); + + memblock.resize(size); + + file.read((char*) &memblock.front(), static_cast(size)); + + std::string bytes(memblock.begin(), memblock.end()); + return bytes; + } + + std::string truncate(const std::string& str, size_t n) + { + if (str.size() < n) + { + return str; + } + else + { + return str.substr(0, n) + "..."; + } + } } // namespace +namespace ix +{ + // + // Autobahn test suite + // + // 1. First you need to generate a config file in a config folder, + // which can use a white list of test to execute (with globbing), + // or a black list of tests to ignore + // + // config/fuzzingserver.json + // { + // "url": "ws://127.0.0.1:9001", + // "outdir": "./reports/clients", + // "cases": ["2.*"], + // "exclude-cases": [ + // ], + // "exclude-agent-cases": {} + // } + // + // + // 2 Run the test server (using docker) + // docker run -it --rm -v "${PWD}/config:/config" -v "${PWD}/reports:/reports" -p 9001:9001 + // --name fuzzingserver crossbario/autobahn-testsuite + // + // 3. Run this command + // ws autobahn -q --url ws://localhost:9001 + // + // 4. A HTML report will be generated, you can inspect it to see if you are compliant or not + // + + class AutobahnTestCase + { + public: + AutobahnTestCase(const std::string& _url, bool quiet); + void run(); + + private: + void log(const std::string& msg); + + std::string _url; + ix::WebSocket _webSocket; + + bool _quiet; + + std::mutex _mutex; + std::condition_variable _condition; + }; + + AutobahnTestCase::AutobahnTestCase(const std::string& url, bool quiet) + : _url(url) + , _quiet(quiet) + { + _webSocket.disableAutomaticReconnection(); + + // FIXME: this should be on by default + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + true, false, false, 15, 15); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + } + + void AutobahnTestCase::log(const std::string& msg) + { + if (!_quiet) + { + spdlog::info(msg); + } + } + + void AutobahnTestCase::run() + { + _webSocket.setUrl(_url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + log("autobahn: connected"); + ss << "Uri: " << msg->openInfo.uri << std::endl; + ss << "Handshake Headers:" << std::endl; + for (auto it : msg->openInfo.headers) + { + ss << it.first << ": " << it.second << std::endl; + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "autobahn: connection closed:"; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + + _condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + ss << "Received " << msg->wireSize << " bytes" << std::endl; + + ss << "autobahn: received message: " << truncate(msg->str, 40) << std::endl; + + _webSocket.send(msg->str, msg->binary); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + + // And error can happen, in which case the test-case is marked done + _condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + ss << "Received message fragment" << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + ss << "Received ping" << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + ss << "Received pong" << std::endl; + } + else + { + ss << "Invalid ix::WebSocketMessageType" << std::endl; + } + + log(ss.str()); + }); + + _webSocket.start(); + + log("Waiting for test completion ..."); + std::unique_lock lock(_mutex); + _condition.wait(lock); + + _webSocket.stop(); + } + + bool generateReport(const std::string& url) + { + ix::WebSocket webSocket; + std::string reportUrl(url); + reportUrl += "/updateReports?agent=ixwebsocket"; + webSocket.setUrl(reportUrl); + webSocket.disableAutomaticReconnection(); + + std::atomic success(true); + std::condition_variable condition; + + webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("Report generated"); + condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + std::stringstream ss; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + spdlog::info(ss.str()); + + success = false; + } + }); + + webSocket.start(); + std::mutex mutex; + std::unique_lock lock(mutex); + condition.wait(lock); + webSocket.stop(); + + if (!success) + { + spdlog::error("Cannot generate report at url {}", reportUrl); + } + + return success; + } + + int getTestCaseCount(const std::string& url) + { + ix::WebSocket webSocket; + std::string caseCountUrl(url); + caseCountUrl += "/getCaseCount"; + webSocket.setUrl(caseCountUrl); + webSocket.disableAutomaticReconnection(); + + int count = -1; + std::condition_variable condition; + + webSocket.setOnMessageCallback([&condition, &count](const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Close) + { + condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + std::stringstream ss; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + spdlog::info(ss.str()); + + condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + // response is a string + std::stringstream ss; + ss << msg->str; + ss >> count; + } + }); + + webSocket.start(); + std::mutex mutex; + std::unique_lock lock(mutex); + condition.wait(lock); + webSocket.stop(); + + if (count == -1) + { + spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl); + } + + return count; + } + + // + // make && bench ws autobahn --url ws://localhost:9001 + // + int ws_autobahn_main(const std::string& url, bool quiet) + { + int testCasesCount = getTestCaseCount(url); + spdlog::info("Test cases count: {}", testCasesCount); + + if (testCasesCount == -1) + { + spdlog::error("Cannot retrieve test case count at url {}", url); + return 1; + } + + testCasesCount++; + + for (int i = 1; i < testCasesCount; ++i) + { + spdlog::info("Execute test case {}", i); + + int caseNumber = i; + + std::stringstream ss; + ss << url << "/runCase?case=" << caseNumber << "&agent=ixwebsocket"; + + std::string url(ss.str()); + + AutobahnTestCase testCase(url, quiet); + testCase.run(); + } + + return generateReport(url) ? 0 : 1; + } + + // + // broadcast server + // + int ws_broadcast_server_main(int port, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::WebSocketServer server(port, hostname); + server.setTLSOptions(tlsOptions); + + server.setOnClientMessageCallback( + [&server](std::shared_ptr connectionState, + ConnectionInfo& connectionInfo, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionInfo.remoteIp; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("New connection"); + spdlog::info("remote ip: {}", remoteIp); + spdlog::info("id: {}", connectionState->getId()); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("Closed connection: code {} reason {}", + msg->closeInfo.code, + msg->closeInfo.reason); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + std::stringstream ss; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + spdlog::info(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + spdlog::info("Received message fragment"); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("Received {} bytes", msg->wireSize); + + for (auto&& client : server.getClients()) + { + if (client.get() != &webSocket) + { + client->send(msg->str, msg->binary, [](int current, int total) -> bool { + spdlog::info("Step {} out of {}", current, total); + return true; + }); + + do + { + size_t bufferedAmount = client->bufferedAmount(); + spdlog::info("{} bytes left to be sent", bufferedAmount); + + std::chrono::duration duration(500); + std::this_thread::sleep_for(duration); + } while (client->bufferedAmount() != 0); + } + } + } + }); + + auto res = server.listen(); + if (!res.first) + { + spdlog::info(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } + + /* + * ws_chat.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. + */ + + // + // Simple chat program that talks to a broadcast server + // Broadcast server can be ran with `ws broadcast_server` + // + + class WebSocketChat + { + public: + WebSocketChat(const std::string& url, const std::string& user); + + void subscribe(const std::string& channel); + void start(); + void stop(); + bool isReady() const; + + void sendMessage(const std::string& text); + size_t getReceivedMessagesCount() const; + + std::string encodeMessage(const std::string& text); + std::pair decodeMessage(const std::string& str); + + private: + std::string _url; + std::string _user; + ix::WebSocket _webSocket; + std::queue _receivedQueue; + + void log(const std::string& msg); + }; + + WebSocketChat::WebSocketChat(const std::string& url, const std::string& user) + : _url(url) + , _user(user) + { + ; + } + + void WebSocketChat::log(const std::string& msg) + { + spdlog::info(msg); + } + + size_t WebSocketChat::getReceivedMessagesCount() const + { + return _receivedQueue.size(); + } + + bool WebSocketChat::isReady() const + { + return _webSocket.getReadyState() == ix::ReadyState::Open; + } + + void WebSocketChat::stop() + { + _webSocket.stop(); + } + + void WebSocketChat::start() + { + _webSocket.setUrl(_url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + log("ws chat: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + spdlog::info("ws chat: user {} connected !", _user); + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "ws chat user disconnected: " << _user; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + auto result = decodeMessage(msg->str); + + // Our "chat" / "broacast" node.js server does not send us + // the messages we send, so we don't have to filter it out. + + // store text + _receivedQueue.push(result.second); + + ss << std::endl + << result.first << "(" << msg->wireSize << " bytes)" + << " > " << result.second << std::endl + << _user << " > "; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + std::pair WebSocketChat::decodeMessage(const std::string& str) + { + auto j = json::parse(str); + + std::string msg_user = j["user"]; + std::string msg_text = j["text"]; + + return std::pair(msg_user, msg_text); + } + + std::string WebSocketChat::encodeMessage(const std::string& text) + { + json j; + j["user"] = _user; + j["text"] = text; + + std::string output = j.dump(); + return output; + } + + void WebSocketChat::sendMessage(const std::string& text) + { + _webSocket.sendText(encodeMessage(text)); + } + + int ws_chat_main(const std::string& url, const std::string& user) + { + spdlog::info("Type Ctrl-D to exit prompt..."); + WebSocketChat webSocketChat(url, user); + webSocketChat.start(); + + while (true) + { + // Read line + std::string line; + std::cout << user << " > " << std::flush; + std::getline(std::cin, line); + + if (!std::cin) + { + break; + } + + webSocketChat.sendMessage(line); + } + + spdlog::info(""); + webSocketChat.stop(); + + return 0; + } + + int ws_cobra_metrics_publish_main(const ix::CobraConfig& config, + const std::string& channel, + const std::string& path, + bool stress) + { + std::atomic sentMessages(0); + std::atomic ackedMessages(0); + CobraConnection::setPublishTrackerCallback( + [&sentMessages, &ackedMessages](bool sent, bool acked) { + if (sent) sentMessages++; + if (acked) ackedMessages++; + }); + + CobraMetricsPublisher cobraMetricsPublisher; + cobraMetricsPublisher.enable(true); + cobraMetricsPublisher.configure(config, channel); + + while (!cobraMetricsPublisher.isAuthenticated()) + ; + + std::ifstream f(path); + std::string str((std::istreambuf_iterator(f)), std::istreambuf_iterator()); + + Json::Value data; + Json::Reader reader; + if (!reader.parse(str, data)) return 1; + + if (!stress) + { + auto msgId = cobraMetricsPublisher.push(channel, data); + spdlog::info("Sent message: {}", msgId); + } + else + { + // Stress mode to try to trigger server and client bugs + while (true) + { + for (int i = 0; i < 1000; ++i) + { + cobraMetricsPublisher.push(channel, data); + } + + cobraMetricsPublisher.suspend(); + cobraMetricsPublisher.resume(); + + // FIXME: investigate why without this check we trigger a lock + while (!cobraMetricsPublisher.isAuthenticated()) + ; + } + } + + // Wait a bit for the message to get a chance to be sent + // there isn't any ack on publish right now so it's the best we can do + // FIXME: this comment is a lie now + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); + + return 0; + } + + int ws_cobra_publish_main(const ix::CobraConfig& config, + const std::string& channel, + const std::string& path) + { + std::ifstream f(path); + std::string str((std::istreambuf_iterator(f)), std::istreambuf_iterator()); + + Json::Value data; + Json::Reader reader; + if (!reader.parse(str, data)) + { + spdlog::info("Input file is not a JSON file"); + return 1; + } + + ix::CobraConnection conn; + conn.configure(config); + + // Display incoming messages + std::atomic authenticated(false); + std::atomic messageAcked(false); + + conn.setEventCallback( + [&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) { + if (event->type == ix::CobraEventType::Open) + { + spdlog::info("Publisher connected"); + + for (auto&& it : event->headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (event->type == ix::CobraEventType::Closed) + { + spdlog::info("Subscriber closed: {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::Authenticated) + { + spdlog::info("Publisher authenticated"); + authenticated = true; + + Json::Value channels; + channels[0] = channel; + auto msgId = conn.publish(channels, data); + + spdlog::info("Published msg {}", msgId); + } + else if (event->type == ix::CobraEventType::Subscribed) + { + spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId); + } + else if (event->type == ix::CobraEventType::UnSubscribed) + { + spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId); + } + else if (event->type == ix::CobraEventType::Error) + { + spdlog::error("Publisher: error {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::Published) + { + spdlog::info("Published message id {} acked", event->msgId); + messageAcked = true; + } + else if (event->type == ix::CobraEventType::Pong) + { + spdlog::info("Received websocket pong"); + } + else if (event->type == ix::CobraEventType::HandshakeError) + { + spdlog::error("Subscriber: Handshake error: {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::AuthenticationError) + { + spdlog::error("Subscriber: Authentication error: {}", event->errMsg); + } + }); + + conn.connect(); + + while (!authenticated) + ; + while (!messageAcked) + ; + + return 0; + } + + class WebSocketConnect + { + public: + WebSocketConnect(const std::string& _url, + const std::string& headers, + bool disableAutomaticReconnection, + bool disablePerMessageDeflate, + bool binaryMode, + uint32_t maxWaitBetweenReconnectionRetries, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs); + + void subscribe(const std::string& channel); + void start(); + void stop(); + + int getSentBytes() + { + return _sentBytes; + } + int getReceivedBytes() + { + return _receivedBytes; + } + + void sendMessage(const std::string& text); + + private: + std::string _url; + WebSocketHttpHeaders _headers; + ix::WebSocket _webSocket; + bool _disablePerMessageDeflate; + bool _binaryMode; + std::atomic _receivedBytes; + std::atomic _sentBytes; + + void log(const std::string& msg); + WebSocketHttpHeaders parseHeaders(const std::string& data); + }; + + WebSocketConnect::WebSocketConnect(const std::string& url, + const std::string& headers, + bool disableAutomaticReconnection, + bool disablePerMessageDeflate, + bool binaryMode, + uint32_t maxWaitBetweenReconnectionRetries, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs) + : _url(url) + , _disablePerMessageDeflate(disablePerMessageDeflate) + , _binaryMode(binaryMode) + , _receivedBytes(0) + , _sentBytes(0) + { + if (disableAutomaticReconnection) + { + _webSocket.disableAutomaticReconnection(); + } + _webSocket.setMaxWaitBetweenReconnectionRetries(maxWaitBetweenReconnectionRetries); + _webSocket.setTLSOptions(tlsOptions); + _webSocket.setPingInterval(pingIntervalSecs); + + _headers = parseHeaders(headers); + + if (!subprotocol.empty()) + { + _webSocket.addSubProtocol(subprotocol); + } + + WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) { + if (incoming) + { + _receivedBytes += size; + } + else + { + _sentBytes += size; + } + }); + } + + void WebSocketConnect::log(const std::string& msg) + { + std::cout << msg << std::endl; + } + + WebSocketHttpHeaders WebSocketConnect::parseHeaders(const std::string& data) + { + WebSocketHttpHeaders headers; + + // Split by \n + std::string token; + std::stringstream tokenStream(data); + + while (std::getline(tokenStream, token)) + { + std::size_t pos = token.rfind(':'); + + // Bail out if last '.' is found + if (pos == std::string::npos) continue; + + auto key = token.substr(0, pos); + auto val = token.substr(pos + 1); + + spdlog::info("{}: {}", key, val); + headers[key] = val; + } + + return headers; + } + + void WebSocketConnect::stop() + { + { + Bench bench("ws_connect: stop connection"); + _webSocket.stop(); + } + } + + void WebSocketConnect::start() + { + _webSocket.setUrl(_url); + _webSocket.setExtraHeaders(_headers); + + if (_disablePerMessageDeflate) + { + _webSocket.disablePerMessageDeflate(); + } + else + { + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + true, false, false, 15, 15); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + } + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("ws_connect: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "ws_connect: connection closed:"; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("Received {} bytes", msg->wireSize); + + ss << "ws_connect: received message: " << msg->str; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + spdlog::info("Received message fragment"); + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + spdlog::info("Received ping"); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + spdlog::info("Received pong {}", msg->str); + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + void WebSocketConnect::sendMessage(const std::string& text) + { + if (_binaryMode) + { + _webSocket.sendBinary(text); + } + else + { + _webSocket.sendText(text); + } + } + + int ws_connect_main(const std::string& url, + const std::string& headers, + bool disableAutomaticReconnection, + bool disablePerMessageDeflate, + bool binaryMode, + uint32_t maxWaitBetweenReconnectionRetries, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs) + { + std::cout << "Type Ctrl-D to exit prompt..." << std::endl; + WebSocketConnect webSocketChat(url, + headers, + disableAutomaticReconnection, + disablePerMessageDeflate, + binaryMode, + maxWaitBetweenReconnectionRetries, + tlsOptions, + subprotocol, + pingIntervalSecs); + webSocketChat.start(); + + while (true) + { + // Read line + std::string line; + auto quit = linenoise::Readline("> ", line); + + if (quit) + { + break; + } + + if (line == "/stop") + { + spdlog::info("Stopping connection..."); + webSocketChat.stop(); + continue; + } + + if (line == "/start") + { + spdlog::info("Starting connection..."); + webSocketChat.start(); + continue; + } + + webSocketChat.sendMessage(line); + + // Add text to history + linenoise::AddHistory(line.c_str()); + } + + spdlog::info(""); + webSocketChat.stop(); + + spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes()); + spdlog::info("Sent {} bytes", webSocketChat.getSentBytes()); + + return 0; + } + + int ws_dns_lookup(const std::string& hostname) + { + auto dnsLookup = std::make_shared(hostname, 80); + + std::string errMsg; + struct addrinfo* res; + + res = dnsLookup->resolve(errMsg, [] { return false; }); + + auto addr = res->ai_addr; + + char str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &addr, str, INET_ADDRSTRLEN); + + spdlog::info("host: {} ip: {}", hostname, str); + + return 0; + } + + int ws_echo_client(const std::string& url, + bool disablePerMessageDeflate, + bool binaryMode, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs, + const std::string& sendMsg, + bool noSend) + { + // Our websocket object + ix::WebSocket webSocket; + + webSocket.setUrl(url); + webSocket.setTLSOptions(tlsOptions); + webSocket.setPingInterval(pingIntervalSecs); + + if (disablePerMessageDeflate) + { + webSocket.disablePerMessageDeflate(); + } + + if (!subprotocol.empty()) + { + webSocket.addSubProtocol(subprotocol); + } + + std::atomic receivedCount(0); + uint64_t receivedCountTotal(0); + uint64_t receivedCountPerSecs(0); + + // Setup a callback to be fired (in a background thread, watch out for race conditions !) + // when a message or an event (open, close, error) is received + webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode]( + const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Message) + { + if (!noSend) + { + webSocket.send(msg->str, msg->binary); + } + receivedCount++; + } + else if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("ws_echo_client: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + webSocket.send(sendMsg, binaryMode); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + spdlog::info("Received pong {}", msg->str); + } + }); + + auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { + setThreadName("Timer"); + while (true) + { + // + // We cannot write to sentCount and receivedCount + // as those are used externally, so we need to introduce + // our own counters + // + std::stringstream ss; + ss << "messages received: " << receivedCountPerSecs << " per second " + << receivedCountTotal << " total"; + + CoreLogger::info(ss.str()); + + receivedCountPerSecs = receivedCount - receivedCountTotal; + receivedCountTotal += receivedCountPerSecs; + + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t1(timer); + + // Now that our callback is setup, we can start our background thread and receive messages + std::cout << "Connecting to " << url << "..." << std::endl; + webSocket.start(); + + // Send a message to the server (default to TEXT mode) + webSocket.send("hello world"); + + while (true) + { + std::string text; + std::cout << "> " << std::flush; + std::getline(std::cin, text); + + webSocket.send(text); + } + + return 0; + } + + int ws_echo_server_main(int port, + bool greetings, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions, + bool ipv6, + bool disablePerMessageDeflate, + bool disablePong) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::WebSocketServer server(port, + hostname, + SocketServer::kDefaultTcpBacklog, + SocketServer::kDefaultMaxConnections, + WebSocketServer::kDefaultHandShakeTimeoutSecs, + (ipv6) ? AF_INET6 : AF_INET); + + server.setTLSOptions(tlsOptions); + + if (disablePerMessageDeflate) + { + spdlog::info("Disable per message deflate"); + server.disablePerMessageDeflate(); + } + + if (disablePong) + { + spdlog::info("Disable responding to PING messages with PONG"); + server.disablePong(); + } + + server.setOnClientMessageCallback( + [greetings](std::shared_ptr connectionState, + ConnectionInfo& connectionInfo, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionInfo.remoteIp; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("New connection"); + spdlog::info("remote ip: {}", remoteIp); + spdlog::info("id: {}", connectionState->getId()); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + if (greetings) + { + webSocket.sendText("Welcome !"); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("Closed connection: client id {} code {} reason {}", + connectionState->getId(), + msg->closeInfo.code, + msg->closeInfo.reason); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + spdlog::error("Connection error: {}", msg->errorInfo.reason); + spdlog::error("#retries: {}", msg->errorInfo.retries); + spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); + spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("Received {} bytes", msg->wireSize); + webSocket.send(msg->str, msg->binary); + } + }); + + auto res = server.listen(); + if (!res.first) + { + spdlog::error(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } + + std::string extractFilename(const std::string& path) + { + std::string::size_type idx; + + idx = path.rfind('/'); + if (idx != std::string::npos) + { + std::string filename = path.substr(idx + 1); + return filename; + } + else + { + return path; + } + } + + WebSocketHttpHeaders parseHeaders(const std::string& data) + { + WebSocketHttpHeaders headers; + + // Split by \n + std::string token; + std::stringstream tokenStream(data); + + while (std::getline(tokenStream, token)) + { + std::size_t pos = token.rfind(':'); + + // Bail out if last '.' is found + if (pos == std::string::npos) continue; + + auto key = token.substr(0, pos); + auto val = token.substr(pos + 1); + + spdlog::info("{}: {}", key, val); + headers[key] = val; + } + + return headers; + } + + // + // Useful endpoint to test HTTP post + // https://postman-echo.com/post + // + HttpParameters parsePostParameters(const std::string& data) + { + HttpParameters httpParameters; + + // Split by \n + std::string token; + std::stringstream tokenStream(data); + + while (std::getline(tokenStream, token)) + { + std::size_t pos = token.rfind('='); + + // Bail out if last '.' is found + if (pos == std::string::npos) continue; + + auto key = token.substr(0, pos); + auto val = token.substr(pos + 1); + + spdlog::info("{}: {}", key, val); + httpParameters[key] = val; + } + + return httpParameters; + } + + int ws_http_client_main(const std::string& url, + const std::string& headersData, + const std::string& data, + bool headersOnly, + int connectTimeout, + int transferTimeout, + bool followRedirects, + int maxRedirects, + bool verbose, + bool save, + const std::string& output, + bool compress, + const ix::SocketTLSOptions& tlsOptions) + { + HttpClient httpClient; + httpClient.setTLSOptions(tlsOptions); + + auto args = httpClient.createRequest(); + args->extraHeaders = parseHeaders(headersData); + args->connectTimeout = connectTimeout; + args->transferTimeout = transferTimeout; + args->followRedirects = followRedirects; + args->maxRedirects = maxRedirects; + args->verbose = verbose; + args->compress = compress; + args->logger = [](const std::string& msg) { spdlog::info(msg); }; + args->onProgressCallback = [verbose](int current, int total) -> bool { + if (verbose) + { + spdlog::info("Downloaded {} bytes out of {}", current, total); + } + return true; + }; + + HttpParameters httpParameters = parsePostParameters(data); + + HttpResponsePtr response; + if (headersOnly) + { + response = httpClient.head(url, args); + } + else if (data.empty()) + { + response = httpClient.get(url, args); + } + else + { + response = httpClient.post(url, httpParameters, args); + } + + spdlog::info(""); + + for (auto it : response->headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + spdlog::info("Upload size: {}", response->uploadSize); + spdlog::info("Download size: {}", response->downloadSize); + + spdlog::info("Status: {}", response->statusCode); + if (response->errorCode != HttpErrorCode::Ok) + { + spdlog::info("error message: ", response->errorMsg); + } + + if (!headersOnly && response->errorCode == HttpErrorCode::Ok) + { + if (save || !output.empty()) + { + // FIMXE we should decode the url first + std::string filename = extractFilename(url); + if (!output.empty()) + { + filename = output; + } + + spdlog::info("Writing to disk: {}", filename); + std::ofstream out(filename); + out.write((char*) &response->payload.front(), response->payload.size()); + out.close(); + } + else + { + if (response->headers["Content-Type"] != "application/octet-stream") + { + spdlog::info("payload: {}", response->payload); + } + else + { + spdlog::info("Binary output can mess up your terminal."); + spdlog::info("Use the -O flag to save the file to disk."); + spdlog::info("You can also use the --output option to specify a filename."); + } + } + } + + return 0; + } + + int ws_httpd_main(int port, + const std::string& hostname, + bool redirect, + const std::string& redirectUrl, + const ix::SocketTLSOptions& tlsOptions) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::HttpServer server(port, hostname); + server.setTLSOptions(tlsOptions); + + if (redirect) + { + server.makeRedirectServer(redirectUrl); + } + + auto res = server.listen(); + if (!res.first) + { + spdlog::error(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } + + class WebSocketPingPong + { + public: + WebSocketPingPong(const std::string& _url, const ix::SocketTLSOptions& tlsOptions); + + void subscribe(const std::string& channel); + void start(); + void stop(); + + void ping(const std::string& text); + void send(const std::string& text); + + private: + std::string _url; + ix::WebSocket _webSocket; + + void log(const std::string& msg); + }; + + WebSocketPingPong::WebSocketPingPong(const std::string& url, + const ix::SocketTLSOptions& tlsOptions) + : _url(url) + { + _webSocket.setTLSOptions(tlsOptions); + } + + void WebSocketPingPong::log(const std::string& msg) + { + spdlog::info(msg); + } + + void WebSocketPingPong::stop() + { + _webSocket.stop(); + } + + void WebSocketPingPong::start() + { + _webSocket.setUrl(_url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { + spdlog::info("Received {} bytes", msg->wireSize); + + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + log("ping_pong: connected"); + + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "ping_pong: disconnected:" + << " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason + << msg->str; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + ss << "ping_pong: received message: " << msg->str; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + ss << "ping_pong: received ping message: " << msg->str; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + ss << "ping_pong: received pong message: " << msg->str; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + } + else + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + void WebSocketPingPong::ping(const std::string& text) + { + if (!_webSocket.ping(text).success) + { + std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint " + "is disconnected" + << std::endl; + } + } + + void WebSocketPingPong::send(const std::string& text) + { + _webSocket.send(text); + } + + int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions) + { + spdlog::info("Type Ctrl-D to exit prompt..."); + WebSocketPingPong webSocketPingPong(url, tlsOptions); + webSocketPingPong.start(); + + while (true) + { + std::string text; + std::cout << "> " << std::flush; + std::getline(std::cin, text); + + if (!std::cin) + { + break; + } + + if (text == "/close") + { + webSocketPingPong.send(text); + } + else + { + webSocketPingPong.ping(text); + } + } + + std::cout << std::endl; + webSocketPingPong.stop(); + + return 0; + } + + int ws_push_server(int port, + bool greetings, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions, + bool ipv6, + bool disablePerMessageDeflate, + bool disablePong, + const std::string& sendMsg) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::WebSocketServer server(port, + hostname, + SocketServer::kDefaultTcpBacklog, + SocketServer::kDefaultMaxConnections, + WebSocketServer::kDefaultHandShakeTimeoutSecs, + (ipv6) ? AF_INET6 : AF_INET); + + server.setTLSOptions(tlsOptions); + + if (disablePerMessageDeflate) + { + spdlog::info("Disable per message deflate"); + server.disablePerMessageDeflate(); + } + + if (disablePong) + { + spdlog::info("Disable responding to PING messages with PONG"); + server.disablePong(); + } + + server.setOnClientMessageCallback( + [greetings, &sendMsg](std::shared_ptr connectionState, + ConnectionInfo& connectionInfo, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionInfo.remoteIp; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("New connection"); + spdlog::info("remote ip: {}", remoteIp); + spdlog::info("id: {}", connectionState->getId()); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + if (greetings) + { + webSocket.sendText("Welcome !"); + } + + bool binary = false; + while (true) + { + webSocket.send(sendMsg, binary); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("Closed connection: client id {} code {} reason {}", + connectionState->getId(), + msg->closeInfo.code, + msg->closeInfo.reason); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + spdlog::error("Connection error: {}", msg->errorInfo.reason); + spdlog::error("#retries: {}", msg->errorInfo.retries); + spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); + spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("Received {} bytes", msg->wireSize); + webSocket.send(msg->str, msg->binary); + } + }); + + auto res = server.listen(); + if (!res.first) + { + spdlog::error(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } + + class WebSocketReceiver + { + public: + WebSocketReceiver(const std::string& _url, + bool enablePerMessageDeflate, + int delayMs, + const ix::SocketTLSOptions& tlsOptions); + + void subscribe(const std::string& channel); + void start(); + void stop(); + + void waitForConnection(); + void waitForMessage(); + void handleMessage(const std::string& str); + + private: + std::string _url; + std::string _id; + ix::WebSocket _webSocket; + bool _enablePerMessageDeflate; + int _delayMs; + int _receivedFragmentCounter; + + std::mutex _conditionVariableMutex; + std::condition_variable _condition; + + std::string extractFilename(const std::string& path); + void handleError(const std::string& errMsg, const std::string& id); + void log(const std::string& msg); + }; + + WebSocketReceiver::WebSocketReceiver(const std::string& url, + bool enablePerMessageDeflate, + int delayMs, + const ix::SocketTLSOptions& tlsOptions) + : _url(url) + , _enablePerMessageDeflate(enablePerMessageDeflate) + , _delayMs(delayMs) + , _receivedFragmentCounter(0) + { + _webSocket.disableAutomaticReconnection(); + _webSocket.setTLSOptions(tlsOptions); + } + + void WebSocketReceiver::stop() + { + _webSocket.stop(); + } + + void WebSocketReceiver::log(const std::string& msg) + { + spdlog::info(msg); + } + + void WebSocketReceiver::waitForConnection() + { + spdlog::info("{}: Connecting...", "ws_receive"); + + std::unique_lock lock(_conditionVariableMutex); + _condition.wait(lock); + } + + void WebSocketReceiver::waitForMessage() + { + spdlog::info("{}: Waiting for message...", "ws_receive"); + + std::unique_lock lock(_conditionVariableMutex); + _condition.wait(lock); + } + + // We should cleanup the file name and full path further to remove .. as well + std::string WebSocketReceiver::extractFilename(const std::string& path) + { + std::string::size_type idx; + + idx = path.rfind('/'); + if (idx != std::string::npos) + { + std::string filename = path.substr(idx + 1); + return filename; + } + else + { + return path; + } + } + + void WebSocketReceiver::handleError(const std::string& errMsg, const std::string& id) + { + std::map pdu; + pdu["kind"] = "error"; + pdu["id"] = id; + pdu["message"] = errMsg; + + MsgPack msg(pdu); + _webSocket.sendBinary(msg.dump()); + } + + void WebSocketReceiver::handleMessage(const std::string& str) + { + spdlog::info("ws_receive: Received message: {}", str.size()); + + std::string errMsg; + MsgPack data = MsgPack::parse(str, errMsg); + if (!errMsg.empty()) + { + handleError("ws_receive: Invalid MsgPack", std::string()); + return; + } + + spdlog::info("id: {}", data["id"].string_value()); + + std::vector content = data["content"].binary_items(); + spdlog::info("ws_receive: Content size: {}", content.size()); + + // Validate checksum + uint64_t cksum = ix::djb2Hash(content); + auto cksumRef = data["djb2_hash"].string_value(); + + spdlog::info("ws_receive: Computed hash: {}", cksum); + spdlog::info("ws_receive: Reference hash: {}", cksumRef); + + if (std::to_string(cksum) != cksumRef) + { + handleError("Hash mismatch.", std::string()); + return; + } + + std::string filename = data["filename"].string_value(); + filename = extractFilename(filename); + + std::string filenameTmp = filename + ".tmp"; + + spdlog::info("ws_receive: Writing to disk: {}", filenameTmp); + std::ofstream out(filenameTmp); + out.write((char*) &content.front(), content.size()); + out.close(); + + spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename); + rename(filenameTmp.c_str(), filename.c_str()); + + std::map pdu; + pdu["ack"] = true; + pdu["id"] = data["id"]; + pdu["filename"] = data["filename"]; + + spdlog::info("Sending ack to sender"); + MsgPack msg(pdu); + _webSocket.sendBinary(msg.dump()); + } + + void WebSocketReceiver::start() + { + _webSocket.setUrl(_url); + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + _enablePerMessageDeflate, false, false, 15, 15); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + + std::stringstream ss; + log(std::string("ws_receive: Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + _condition.notify_one(); + + log("ws_receive: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "ws_receive: connection closed:"; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + ss << "ws_receive: transfered " << msg->wireSize << " bytes"; + log(ss.str()); + handleMessage(msg->str); + _condition.notify_one(); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + ss << "ws_receive: received fragment " << _receivedFragmentCounter++; + log(ss.str()); + + if (_delayMs > 0) + { + // Introduce an arbitrary delay, to simulate a slow connection + std::chrono::duration duration(_delayMs); + std::this_thread::sleep_for(duration); + } + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "ws_receive "; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + log("ws_receive: received ping"); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + log("ws_receive: received pong"); + } + else + { + ss << "ws_receive: Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + void wsReceive(const std::string& url, + bool enablePerMessageDeflate, + int delayMs, + const ix::SocketTLSOptions& tlsOptions) + { + WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs, tlsOptions); + webSocketReceiver.start(); + + webSocketReceiver.waitForConnection(); + + webSocketReceiver.waitForMessage(); + + std::chrono::duration duration(1000); + std::this_thread::sleep_for(duration); + + spdlog::info("ws_receive: Done !"); + webSocketReceiver.stop(); + } + + int ws_receive_main(const std::string& url, + bool enablePerMessageDeflate, + int delayMs, + const ix::SocketTLSOptions& tlsOptions) + { + wsReceive(url, enablePerMessageDeflate, delayMs, tlsOptions); + return 0; + } + + int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password) + { + RedisClient redisClient; + if (!redisClient.connect(hostname, port)) + { + spdlog::info("Cannot connect to redis host"); + return 1; + } + + if (!password.empty()) + { + std::string authResponse; + if (!redisClient.auth(password, authResponse)) + { + std::stringstream ss; + spdlog::info("Cannot authenticated to redis"); + return 1; + } + spdlog::info("Auth response: {}", authResponse); + } + + while (true) + { + // Read line + std::string line; + std::string prompt; + prompt += hostname; + prompt += ":"; + prompt += std::to_string(port); + prompt += "> "; + auto quit = linenoise::Readline(prompt.c_str(), line); + + if (quit) + { + break; + } + + std::stringstream ss(line); + std::vector args; + std::string arg; + + while (ss.good()) + { + ss >> arg; + args.push_back(arg); + } + + std::string errMsg; + auto response = redisClient.send(args, errMsg); + if (!errMsg.empty()) + { + spdlog::error("(error) {}", errMsg); + } + else + { + if (response.first != RespType::String) + { + std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")" + << " "; + } + + std::cout << response.second << std::endl; + } + + linenoise::AddHistory(line.c_str()); + } + + return 0; + } + + int ws_redis_publish_main(const std::string& hostname, + int port, + const std::string& password, + const std::string& channel, + const std::string& message, + int count) + { + RedisClient redisClient; + if (!redisClient.connect(hostname, port)) + { + spdlog::info("Cannot connect to redis host"); + return 1; + } + + if (!password.empty()) + { + std::string authResponse; + if (!redisClient.auth(password, authResponse)) + { + std::stringstream ss; + spdlog::info("Cannot authenticated to redis"); + return 1; + } + spdlog::info("Auth response: {}", authResponse); + } + + std::string errMsg; + for (int i = 0; i < count; i++) + { + if (!redisClient.publish(channel, message, errMsg)) + { + spdlog::error("Error publishing to channel {} error {}", channel, errMsg); + return 1; + } + } + + return 0; + } + + int ws_redis_server_main(int port, const std::string& hostname) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::RedisServer server(port, hostname); + + auto res = server.listen(); + if (!res.first) + { + spdlog::info(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } + + int ws_redis_subscribe_main(const std::string& hostname, + int port, + const std::string& password, + const std::string& channel, + bool verbose) + { + RedisClient redisClient; + if (!redisClient.connect(hostname, port)) + { + spdlog::info("Cannot connect to redis host"); + return 1; + } + + if (!password.empty()) + { + std::string authResponse; + if (!redisClient.auth(password, authResponse)) + { + std::stringstream ss; + spdlog::info("Cannot authenticated to redis"); + return 1; + } + spdlog::info("Auth response: {}", authResponse); + } + + std::atomic msgPerSeconds(0); + std::atomic msgCount(0); + + auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) { + if (verbose) + { + spdlog::info("recived: {}", message); + } + + msgPerSeconds++; + msgCount++; + }; + + auto responseCallback = [](const std::string& redisResponse) { + spdlog::info("Redis subscribe response: {}", redisResponse); + }; + + auto timer = [&msgPerSeconds, &msgCount] { + while (true) + { + spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); + + msgPerSeconds = 0; + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t(timer); + + spdlog::info("Subscribing to {} ...", channel); + if (!redisClient.subscribe(channel, responseCallback, callback)) + { + spdlog::info("Error subscribing to channel {}", channel); + return 1; + } + + return 0; + } + + class WebSocketSender + { + public: + WebSocketSender(const std::string& _url, + bool enablePerMessageDeflate, + const ix::SocketTLSOptions& tlsOptions); + + void subscribe(const std::string& channel); + void start(); + void stop(); + + void waitForConnection(); + void waitForAck(); + + bool sendMessage(const std::string& filename, bool throttle); + + private: + std::string _url; + std::string _id; + ix::WebSocket _webSocket; + bool _enablePerMessageDeflate; + + std::atomic _connected; + + std::mutex _conditionVariableMutex; + std::condition_variable _condition; + + void log(const std::string& msg); + }; + + WebSocketSender::WebSocketSender(const std::string& url, + bool enablePerMessageDeflate, + const ix::SocketTLSOptions& tlsOptions) + : _url(url) + , _enablePerMessageDeflate(enablePerMessageDeflate) + , _connected(false) + { + _webSocket.disableAutomaticReconnection(); + _webSocket.setTLSOptions(tlsOptions); + } + + void WebSocketSender::stop() + { + _webSocket.stop(); + } + + void WebSocketSender::log(const std::string& msg) + { + spdlog::info(msg); + } + + void WebSocketSender::waitForConnection() + { + spdlog::info("{}: Connecting...", "ws_send"); + + std::unique_lock lock(_conditionVariableMutex); + _condition.wait(lock); + } + + void WebSocketSender::waitForAck() + { + spdlog::info("{}: Waiting for ack...", "ws_send"); + + std::unique_lock lock(_conditionVariableMutex); + _condition.wait(lock); + } + + std::vector load(const std::string& path) + { + std::vector memblock; + + std::ifstream file(path); + if (!file.is_open()) return memblock; + + file.seekg(0, file.end); + std::streamoff size = file.tellg(); + file.seekg(0, file.beg); + + memblock.resize((size_t) size); + file.read((char*) &memblock.front(), static_cast(size)); + + return memblock; + } + + void WebSocketSender::start() + { + _webSocket.setUrl(_url); + + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( + _enablePerMessageDeflate, false, false, 15, 15); + _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); + + std::stringstream ss; + log(std::string("ws_send: Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + _connected = true; + + _condition.notify_one(); + + log("ws_send: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + _connected = false; + + ss << "ws_send: connection closed:"; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + _condition.notify_one(); + + ss << "ws_send: received message (" << msg->wireSize << " bytes)"; + log(ss.str()); + + std::string errMsg; + MsgPack data = MsgPack::parse(msg->str, errMsg); + if (!errMsg.empty()) + { + spdlog::info("Invalid MsgPack response"); + return; + } + + std::string id = data["id"].string_value(); + if (_id != id) + { + spdlog::info("Invalid id"); + } + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "ws_send "; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + spdlog::info("ws_send: received ping"); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + spdlog::info("ws_send: received pong"); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + spdlog::info("ws_send: received fragment"); + } + else + { + ss << "ws_send: Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + bool WebSocketSender::sendMessage(const std::string& filename, bool throttle) + { + std::vector content; + { + Bench bench("ws_send: load file from disk"); + content = load(filename); + } + + _id = uuid4(); + + std::map pdu; + pdu["kind"] = "send"; + pdu["id"] = _id; + pdu["content"] = content; + auto hash = djb2Hash(content); + pdu["djb2_hash"] = std::to_string(hash); + pdu["filename"] = filename; + + MsgPack msg(pdu); + + auto serializedMsg = msg.dump(); + spdlog::info("ws_send: sending {} bytes", serializedMsg.size()); + + Bench bench("ws_send: Sending file through websocket"); + auto result = + _webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool { + spdlog::info("ws_send: Step {} out of {}", current + 1, total); + + if (throttle) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } + + return _connected; + }); + + if (!result.success) + { + spdlog::error("ws_send: Error sending file."); + return false; + } + + if (!_connected) + { + spdlog::error("ws_send: Got disconnected from the server"); + return false; + } + + spdlog::info("ws_send: sent {} bytes", serializedMsg.size()); + + do + { + size_t bufferedAmount = _webSocket.bufferedAmount(); + spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount); + + std::chrono::duration duration(500); + std::this_thread::sleep_for(duration); + } while (_webSocket.bufferedAmount() != 0 && _connected); + + if (_connected) + { + bench.report(); + auto duration = bench.getDuration(); + auto transferRate = 1000 * content.size() / duration; + transferRate /= (1024 * 1024); + spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate); + } + else + { + spdlog::error("ws_send: Got disconnected from the server"); + } + + return _connected; + } + + void wsSend(const std::string& url, + const std::string& path, + bool enablePerMessageDeflate, + bool throttle, + const ix::SocketTLSOptions& tlsOptions) + { + WebSocketSender webSocketSender(url, enablePerMessageDeflate, tlsOptions); + webSocketSender.start(); + + webSocketSender.waitForConnection(); + + spdlog::info("ws_send: Sending..."); + if (webSocketSender.sendMessage(path, throttle)) + { + webSocketSender.waitForAck(); + spdlog::info("ws_send: Done !"); + } + else + { + spdlog::error("ws_send: Error sending file."); + } + + webSocketSender.stop(); + } + + int ws_send_main(const std::string& url, + const std::string& path, + bool disablePerMessageDeflate, + const ix::SocketTLSOptions& tlsOptions) + { + bool throttle = false; + bool enablePerMessageDeflate = !disablePerMessageDeflate; + + wsSend(url, path, enablePerMessageDeflate, throttle, tlsOptions); + return 0; + } + + int ws_sentry_minidump_upload(const std::string& metadataPath, + const std::string& minidump, + const std::string& project, + const std::string& key, + bool verbose) + { + SentryClient sentryClient((std::string())); + + // Read minidump file from disk + std::string minidumpBytes = readBytes(minidump); + + // Read json data + std::string sentryMetadata = readBytes(metadataPath); + + std::atomic done(false); + + sentryClient.uploadMinidump( + sentryMetadata, + minidumpBytes, + project, + key, + verbose, + [verbose, &done](const HttpResponsePtr& response) { + if (verbose) + { + for (auto it : response->headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + spdlog::info("Upload size: {}", response->uploadSize); + spdlog::info("Download size: {}", response->downloadSize); + + spdlog::info("Status: {}", response->statusCode); + if (response->errorCode != HttpErrorCode::Ok) + { + spdlog::info("error message: {}", response->errorMsg); + } + + if (response->headers["Content-Type"] != "application/octet-stream") + { + spdlog::info("payload: {}", response->payload); + } + } + + if (response->statusCode != 200) + { + spdlog::error("Error sending data to sentry: {}", response->statusCode); + spdlog::error("Status: {}", response->statusCode); + spdlog::error("Response: {}", response->payload); + } + else + { + spdlog::info("Event sent to sentry"); + } + + done = true; + }); + + int i = 0; + + while (!done) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + + if (i++ > 5000) break; // wait 5 seconds max + } + + if (!done) + { + spdlog::error("Error: timing out trying to sent a crash to sentry"); + } + + return 0; + } + + int ws_snake_main(int port, + const std::string& hostname, + const std::string& redisHosts, + int redisPort, + const std::string& redisPassword, + bool verbose, + const std::string& appsConfigPath, + const SocketTLSOptions& socketTLSOptions, + bool disablePong, + const std::string& republishChannel) + { + snake::AppConfig appConfig; + appConfig.port = port; + appConfig.hostname = hostname; + appConfig.verbose = verbose; + appConfig.redisPort = redisPort; + appConfig.redisPassword = redisPassword; + appConfig.socketTLSOptions = socketTLSOptions; + appConfig.disablePong = disablePong; + appConfig.republishChannel = republishChannel; + + // Parse config file + auto res = readAsString(appsConfigPath); + bool found = res.first; + if (!found) + { + spdlog::error("Cannot read content of {}", appsConfigPath); + return 1; + } + + auto apps = nlohmann::json::parse(res.second); + appConfig.apps = apps["apps"]; + + std::string token; + std::stringstream tokenStream(redisHosts); + while (std::getline(tokenStream, token, ';')) + { + appConfig.redisHosts.push_back(token); + } + + // Display config on the terminal for debugging + dumpConfig(appConfig); + + snake::SnakeServer snakeServer(appConfig); + snakeServer.runForever(); + + return 0; // should never reach this + } + + int ws_transfer_main(int port, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::WebSocketServer server(port, hostname); + server.setTLSOptions(tlsOptions); + + server.setOnClientMessageCallback( + [&server](std::shared_ptr connectionState, + ConnectionInfo& connectionInfo, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionInfo.remoteIp; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("ws_transfer: New connection"); + spdlog::info("remote ip: {}", remoteIp); + spdlog::info("id: {}", connectionState->getId()); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}", + connectionState->getId(), + msg->closeInfo.code, + msg->closeInfo.reason); + auto remaining = server.getClients().size() - 1; + spdlog::info("ws_transfer: {} remaining clients", remaining); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + std::stringstream ss; + ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + spdlog::info(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + spdlog::info("ws_transfer: Received message fragment "); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("ws_transfer: Received {} bytes", msg->wireSize); + size_t receivers = 0; + for (auto&& client : server.getClients()) + { + if (client.get() != &webSocket) + { + auto readyState = client->getReadyState(); + auto id = connectionState->getId(); + + if (readyState == ReadyState::Open) + { + ++receivers; + client->send( + msg->str, msg->binary, [&id](int current, int total) -> bool { + spdlog::info("{}: [client {}]: Step {} out of {}", + "ws_transfer", + id, + current, + total); + return true; + }); + do + { + size_t bufferedAmount = client->bufferedAmount(); + + spdlog::info("{}: [client {}]: {} bytes left to send", + "ws_transfer", + id, + bufferedAmount); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + } while (client->bufferedAmount() != 0 && + client->getReadyState() == ReadyState::Open); + } + else + { + std::string readyStateString = + readyState == ReadyState::Connecting + ? "Connecting" + : readyState == ReadyState::Closing ? "Closing" : "Closed"; + size_t bufferedAmount = client->bufferedAmount(); + + spdlog::info( + "{}: [client {}]: has readystate {} bytes left to be sent {}", + "ws_transfer", + id, + readyStateString, + bufferedAmount); + } + } + } + if (!receivers) + { + spdlog::info("ws_transfer: no remaining receivers"); + } + } + }); + + auto res = server.listen(); + if (!res.first) + { + spdlog::info(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } +} // namespace ix + int main(int argc, char** argv) { ix::initNetSystem(); diff --git a/ws/ws.h b/ws/ws.h deleted file mode 100644 index 89783ccf..00000000 --- a/ws/ws.h +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ws.h - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ -#pragma once - -#include -#include -#include - -namespace ix -{ - int ws_http_client_main(const std::string& url, - const std::string& headers, - const std::string& data, - bool headersOnly, - int connectTimeout, - int transferTimeout, - bool followRedirects, - int maxRedirects, - bool verbose, - bool save, - const std::string& output, - bool compress, - const ix::SocketTLSOptions& tlsOptions); - - int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions); - - int ws_echo_server_main(int port, - bool greetings, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions, - bool ipv6, - bool disablePerMessageDeflate, - bool disablePong); - - int ws_push_server(int port, - bool greetings, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions, - bool ipv6, - bool disablePerMessageDeflate, - bool disablePong, - const std::string& sendMsg); - - int ws_broadcast_server_main(int port, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions); - int ws_transfer_main(int port, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions); - - int ws_chat_main(const std::string& url, const std::string& user); - - int ws_connect_main(const std::string& url, - const std::string& headers, - bool disableAutomaticReconnection, - bool disablePerMessageDeflate, - bool binaryMode, - uint32_t maxWaitBetweenReconnectionRetries, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs); - - int ws_echo_client(const std::string& url, - bool disablePerMessageDeflate, - bool binaryMode, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs, - const std::string& sendMsg, - bool noSend); - - int ws_receive_main(const std::string& url, - bool enablePerMessageDeflate, - int delayMs, - const ix::SocketTLSOptions& tlsOptions); - - int ws_send_main(const std::string& url, - const std::string& path, - bool disablePerMessageDeflate, - const ix::SocketTLSOptions& tlsOptions); - - int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password); - - int ws_redis_publish_main(const std::string& hostname, - int port, - const std::string& password, - const std::string& channel, - const std::string& message, - int count); - - int ws_redis_subscribe_main(const std::string& hostname, - int port, - const std::string& password, - const std::string& channel, - bool verbose); - - int ws_cobra_publish_main(const ix::CobraConfig& appkey, - const std::string& channel, - const std::string& path); - - int ws_cobra_metrics_publish_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& path, - bool stress); - - int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - const std::string& host, - int port); - - int ws_snake_main(int port, - const std::string& hostname, - const std::string& redisHosts, - int redisPort, - const std::string& redisPassword, - bool verbose, - const std::string& appsConfigPath, - const ix::SocketTLSOptions& tlsOptions, - bool disablePong, - const std::string& republishChannel); - - int ws_httpd_main(int port, - const std::string& hostname, - bool redirect, - const std::string& redirectUrl, - const ix::SocketTLSOptions& tlsOptions); - - int ws_autobahn_main(const std::string& url, bool quiet); - - int ws_redis_server_main(int port, const std::string& hostname); - - int ws_sentry_minidump_upload(const std::string& metadataPath, - const std::string& minidump, - const std::string& project, - const std::string& key, - bool verbose); - - int ws_dns_lookup(const std::string& hostname); -} // namespace ix diff --git a/ws/ws_autobahn.cpp b/ws/ws_autobahn.cpp deleted file mode 100644 index 19ffc056..00000000 --- a/ws/ws_autobahn.cpp +++ /dev/null @@ -1,298 +0,0 @@ -/* - * ws_autobahn.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -// -// 1. First you need to generate a config file in a config folder, -// which can use a white list of test to execute (with globbing), -// or a black list of tests to ignore -// -// config/fuzzingserver.json -// { -// "url": "ws://127.0.0.1:9001", -// "outdir": "./reports/clients", -// "cases": ["2.*"], -// "exclude-cases": [ -// ], -// "exclude-agent-cases": {} -// } -// -// -// 2 Run the test server (using docker) -// docker run -it --rm -v "${PWD}/config:/config" -v "${PWD}/reports:/reports" -p 9001:9001 --name -// fuzzingserver crossbario/autobahn-testsuite -// -// 3. Run this command -// ws autobahn -q --url ws://localhost:9001 -// -// 4. A HTML report will be generated, you can inspect it to see if you are compliant or not -// - -#include -#include -#include -#include -#include -#include -#include - - -namespace -{ - std::string truncate(const std::string& str, size_t n) - { - if (str.size() < n) - { - return str; - } - else - { - return str.substr(0, n) + "..."; - } - } -} // namespace - -namespace ix -{ - class AutobahnTestCase - { - public: - AutobahnTestCase(const std::string& _url, bool quiet); - void run(); - - private: - void log(const std::string& msg); - - std::string _url; - ix::WebSocket _webSocket; - - bool _quiet; - - std::mutex _mutex; - std::condition_variable _condition; - }; - - AutobahnTestCase::AutobahnTestCase(const std::string& url, bool quiet) - : _url(url) - , _quiet(quiet) - { - _webSocket.disableAutomaticReconnection(); - - // FIXME: this should be on by default - ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( - true, false, false, 15, 15); - _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); - } - - void AutobahnTestCase::log(const std::string& msg) - { - if (!_quiet) - { - spdlog::info(msg); - } - } - - void AutobahnTestCase::run() - { - _webSocket.setUrl(_url); - - std::stringstream ss; - log(std::string("Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - log("autobahn: connected"); - ss << "Uri: " << msg->openInfo.uri << std::endl; - ss << "Handshake Headers:" << std::endl; - for (auto it : msg->openInfo.headers) - { - ss << it.first << ": " << it.second << std::endl; - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "autobahn: connection closed:"; - ss << " code " << msg->closeInfo.code; - ss << " reason " << msg->closeInfo.reason << std::endl; - - _condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - ss << "Received " << msg->wireSize << " bytes" << std::endl; - - ss << "autobahn: received message: " << truncate(msg->str, 40) << std::endl; - - _webSocket.send(msg->str, msg->binary); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - - // And error can happen, in which case the test-case is marked done - _condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - ss << "Received message fragment" << std::endl; - } - else if (msg->type == ix::WebSocketMessageType::Ping) - { - ss << "Received ping" << std::endl; - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - ss << "Received pong" << std::endl; - } - else - { - ss << "Invalid ix::WebSocketMessageType" << std::endl; - } - - log(ss.str()); - }); - - _webSocket.start(); - - log("Waiting for test completion ..."); - std::unique_lock lock(_mutex); - _condition.wait(lock); - - _webSocket.stop(); - } - - bool generateReport(const std::string& url) - { - ix::WebSocket webSocket; - std::string reportUrl(url); - reportUrl += "/updateReports?agent=ixwebsocket"; - webSocket.setUrl(reportUrl); - webSocket.disableAutomaticReconnection(); - - std::atomic success(true); - std::condition_variable condition; - - webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("Report generated"); - condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - spdlog::info(ss.str()); - - success = false; - } - }); - - webSocket.start(); - std::mutex mutex; - std::unique_lock lock(mutex); - condition.wait(lock); - webSocket.stop(); - - if (!success) - { - spdlog::error("Cannot generate report at url {}", reportUrl); - } - - return success; - } - - int getTestCaseCount(const std::string& url) - { - ix::WebSocket webSocket; - std::string caseCountUrl(url); - caseCountUrl += "/getCaseCount"; - webSocket.setUrl(caseCountUrl); - webSocket.disableAutomaticReconnection(); - - int count = -1; - std::condition_variable condition; - - webSocket.setOnMessageCallback([&condition, &count](const ix::WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Close) - { - condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - spdlog::info(ss.str()); - - condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - // response is a string - std::stringstream ss; - ss << msg->str; - ss >> count; - } - }); - - webSocket.start(); - std::mutex mutex; - std::unique_lock lock(mutex); - condition.wait(lock); - webSocket.stop(); - - if (count == -1) - { - spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl); - } - - return count; - } - - // - // make && bench ws autobahn --url ws://localhost:9001 - // - int ws_autobahn_main(const std::string& url, bool quiet) - { - int testCasesCount = getTestCaseCount(url); - spdlog::info("Test cases count: {}", testCasesCount); - - if (testCasesCount == -1) - { - spdlog::error("Cannot retrieve test case count at url {}", url); - return 1; - } - - testCasesCount++; - - for (int i = 1; i < testCasesCount; ++i) - { - spdlog::info("Execute test case {}", i); - - int caseNumber = i; - - std::stringstream ss; - ss << url << "/runCase?case=" << caseNumber << "&agent=ixwebsocket"; - - std::string url(ss.str()); - - AutobahnTestCase testCase(url, quiet); - testCase.run(); - } - - return generateReport(url) ? 0 : 1; - } -} // namespace ix diff --git a/ws/ws_broadcast_server.cpp b/ws/ws_broadcast_server.cpp deleted file mode 100644 index b154ef94..00000000 --- a/ws/ws_broadcast_server.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ws_broadcast_server.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include - - -namespace ix -{ - int ws_broadcast_server_main(int port, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::WebSocketServer server(port, hostname); - server.setTLSOptions(tlsOptions); - - server.setOnClientMessageCallback( - [&server](std::shared_ptr connectionState, - ConnectionInfo& connectionInfo, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { - auto remoteIp = connectionInfo.remoteIp; - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("New connection"); - spdlog::info("remote ip: {}", remoteIp); - spdlog::info("id: {}", connectionState->getId()); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("Closed connection: code {} reason {}", - msg->closeInfo.code, - msg->closeInfo.reason); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - spdlog::info(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - spdlog::info("Received message fragment"); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("Received {} bytes", msg->wireSize); - - for (auto&& client : server.getClients()) - { - if (client.get() != &webSocket) - { - client->send(msg->str, msg->binary, [](int current, int total) -> bool { - spdlog::info("Step {} out of {}", current, total); - return true; - }); - - do - { - size_t bufferedAmount = client->bufferedAmount(); - spdlog::info("{} bytes left to be sent", bufferedAmount); - - std::chrono::duration duration(500); - std::this_thread::sleep_for(duration); - } while (client->bufferedAmount() != 0); - } - } - } - }); - - auto res = server.listen(); - if (!res.first) - { - spdlog::info(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_chat.cpp b/ws/ws_chat.cpp deleted file mode 100644 index 690f5b9c..00000000 --- a/ws/ws_chat.cpp +++ /dev/null @@ -1,191 +0,0 @@ -/* - * ws_chat.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. - */ - -// -// Simple chat program that talks to a broadcast server -// Broadcast server can be ran with `ws broadcast_server` -// - -#include "nlohmann/json.hpp" -#include -#include -#include -#include -#include -#include - -// for convenience -using json = nlohmann::json; - -namespace ix -{ - class WebSocketChat - { - public: - WebSocketChat(const std::string& url, const std::string& user); - - void subscribe(const std::string& channel); - void start(); - void stop(); - bool isReady() const; - - void sendMessage(const std::string& text); - size_t getReceivedMessagesCount() const; - - std::string encodeMessage(const std::string& text); - std::pair decodeMessage(const std::string& str); - - private: - std::string _url; - std::string _user; - ix::WebSocket _webSocket; - std::queue _receivedQueue; - - void log(const std::string& msg); - }; - - WebSocketChat::WebSocketChat(const std::string& url, const std::string& user) - : _url(url) - , _user(user) - { - ; - } - - void WebSocketChat::log(const std::string& msg) - { - spdlog::info(msg); - } - - size_t WebSocketChat::getReceivedMessagesCount() const - { - return _receivedQueue.size(); - } - - bool WebSocketChat::isReady() const - { - return _webSocket.getReadyState() == ix::ReadyState::Open; - } - - void WebSocketChat::stop() - { - _webSocket.stop(); - } - - void WebSocketChat::start() - { - _webSocket.setUrl(_url); - - std::stringstream ss; - log(std::string("Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - log("ws chat: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - spdlog::info("ws chat: user {} connected !", _user); - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "ws chat user disconnected: " << _user; - ss << " code " << msg->closeInfo.code; - ss << " reason " << msg->closeInfo.reason << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - auto result = decodeMessage(msg->str); - - // Our "chat" / "broacast" node.js server does not send us - // the messages we send, so we don't have to filter it out. - - // store text - _receivedQueue.push(result.second); - - ss << std::endl - << result.first << "(" << msg->wireSize << " bytes)" - << " > " << result.second << std::endl - << _user << " > "; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - log(ss.str()); - } - else - { - ss << "Invalid ix::WebSocketMessageType"; - log(ss.str()); - } - }); - - _webSocket.start(); - } - - std::pair WebSocketChat::decodeMessage(const std::string& str) - { - auto j = json::parse(str); - - std::string msg_user = j["user"]; - std::string msg_text = j["text"]; - - return std::pair(msg_user, msg_text); - } - - std::string WebSocketChat::encodeMessage(const std::string& text) - { - json j; - j["user"] = _user; - j["text"] = text; - - std::string output = j.dump(); - return output; - } - - void WebSocketChat::sendMessage(const std::string& text) - { - _webSocket.sendText(encodeMessage(text)); - } - - int ws_chat_main(const std::string& url, const std::string& user) - { - spdlog::info("Type Ctrl-D to exit prompt..."); - WebSocketChat webSocketChat(url, user); - webSocketChat.start(); - - while (true) - { - // Read line - std::string line; - std::cout << user << " > " << std::flush; - std::getline(std::cin, line); - - if (!std::cin) - { - break; - } - - webSocketChat.sendMessage(line); - } - - spdlog::info(""); - webSocketChat.stop(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_cobra_metrics_publish.cpp b/ws/ws_cobra_metrics_publish.cpp deleted file mode 100644 index 477f3d55..00000000 --- a/ws/ws_cobra_metrics_publish.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ws_cobra_metrics_publish.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_cobra_metrics_publish_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& path, - bool stress) - { - std::atomic sentMessages(0); - std::atomic ackedMessages(0); - CobraConnection::setPublishTrackerCallback( - [&sentMessages, &ackedMessages](bool sent, bool acked) { - if (sent) sentMessages++; - if (acked) ackedMessages++; - }); - - CobraMetricsPublisher cobraMetricsPublisher; - cobraMetricsPublisher.enable(true); - cobraMetricsPublisher.configure(config, channel); - - while (!cobraMetricsPublisher.isAuthenticated()) - ; - - std::ifstream f(path); - std::string str((std::istreambuf_iterator(f)), std::istreambuf_iterator()); - - Json::Value data; - Json::Reader reader; - if (!reader.parse(str, data)) return 1; - - if (!stress) - { - auto msgId = cobraMetricsPublisher.push(channel, data); - spdlog::info("Sent message: {}", msgId); - } - else - { - // Stress mode to try to trigger server and client bugs - while (true) - { - for (int i = 0; i < 1000; ++i) - { - cobraMetricsPublisher.push(channel, data); - } - - cobraMetricsPublisher.suspend(); - cobraMetricsPublisher.resume(); - - // FIXME: investigate why without this check we trigger a lock - while (!cobraMetricsPublisher.isAuthenticated()) - ; - } - } - - // Wait a bit for the message to get a chance to be sent - // there isn't any ack on publish right now so it's the best we can do - // FIXME: this comment is a lie now - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); - - return 0; - } -} // namespace ix diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp deleted file mode 100644 index e26a1c7b..00000000 --- a/ws/ws_cobra_publish.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ws_cobra_publish.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_cobra_publish_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& path) - { - std::ifstream f(path); - std::string str((std::istreambuf_iterator(f)), std::istreambuf_iterator()); - - Json::Value data; - Json::Reader reader; - if (!reader.parse(str, data)) - { - spdlog::info("Input file is not a JSON file"); - return 1; - } - - ix::CobraConnection conn; - conn.configure(config); - - // Display incoming messages - std::atomic authenticated(false); - std::atomic messageAcked(false); - - conn.setEventCallback( - [&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) { - if (event->type == ix::CobraEventType::Open) - { - spdlog::info("Publisher connected"); - - for (auto&& it : event->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (event->type == ix::CobraEventType::Closed) - { - spdlog::info("Subscriber closed: {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Authenticated) - { - spdlog::info("Publisher authenticated"); - authenticated = true; - - Json::Value channels; - channels[0] = channel; - auto msgId = conn.publish(channels, data); - - spdlog::info("Published msg {}", msgId); - } - else if (event->type == ix::CobraEventType::Subscribed) - { - spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::UnSubscribed) - { - spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::Error) - { - spdlog::error("Publisher: error {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Published) - { - spdlog::info("Published message id {} acked", event->msgId); - messageAcked = true; - } - else if (event->type == ix::CobraEventType::Pong) - { - spdlog::info("Received websocket pong"); - } - else if (event->type == ix::CobraEventType::HandshakeError) - { - spdlog::error("Subscriber: Handshake error: {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::AuthenticationError) - { - spdlog::error("Subscriber: Authentication error: {}", event->errMsg); - } - }); - - conn.connect(); - - while (!authenticated) - ; - while (!messageAcked) - ; - - return 0; - } -} // namespace ix diff --git a/ws/ws_connect.cpp b/ws/ws_connect.cpp deleted file mode 100644 index b6827322..00000000 --- a/ws/ws_connect.cpp +++ /dev/null @@ -1,288 +0,0 @@ -/* - * ws_connect.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. - */ - -#include "IXBench.h" -#include "linenoise.hpp" -#include -#include -#include -#include -#include -#include - - -namespace ix -{ - class WebSocketConnect - { - public: - WebSocketConnect(const std::string& _url, - const std::string& headers, - bool disableAutomaticReconnection, - bool disablePerMessageDeflate, - bool binaryMode, - uint32_t maxWaitBetweenReconnectionRetries, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs); - - void subscribe(const std::string& channel); - void start(); - void stop(); - - int getSentBytes() - { - return _sentBytes; - } - int getReceivedBytes() - { - return _receivedBytes; - } - - void sendMessage(const std::string& text); - - private: - std::string _url; - WebSocketHttpHeaders _headers; - ix::WebSocket _webSocket; - bool _disablePerMessageDeflate; - bool _binaryMode; - std::atomic _receivedBytes; - std::atomic _sentBytes; - - void log(const std::string& msg); - WebSocketHttpHeaders parseHeaders(const std::string& data); - }; - - WebSocketConnect::WebSocketConnect(const std::string& url, - const std::string& headers, - bool disableAutomaticReconnection, - bool disablePerMessageDeflate, - bool binaryMode, - uint32_t maxWaitBetweenReconnectionRetries, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs) - : _url(url) - , _disablePerMessageDeflate(disablePerMessageDeflate) - , _binaryMode(binaryMode) - , _receivedBytes(0) - , _sentBytes(0) - { - if (disableAutomaticReconnection) - { - _webSocket.disableAutomaticReconnection(); - } - _webSocket.setMaxWaitBetweenReconnectionRetries(maxWaitBetweenReconnectionRetries); - _webSocket.setTLSOptions(tlsOptions); - _webSocket.setPingInterval(pingIntervalSecs); - - _headers = parseHeaders(headers); - - if (!subprotocol.empty()) - { - _webSocket.addSubProtocol(subprotocol); - } - - WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) { - if (incoming) - { - _receivedBytes += size; - } - else - { - _sentBytes += size; - } - }); - } - - void WebSocketConnect::log(const std::string& msg) - { - std::cout << msg << std::endl; - } - - WebSocketHttpHeaders WebSocketConnect::parseHeaders(const std::string& data) - { - WebSocketHttpHeaders headers; - - // Split by \n - std::string token; - std::stringstream tokenStream(data); - - while (std::getline(tokenStream, token)) - { - std::size_t pos = token.rfind(':'); - - // Bail out if last '.' is found - if (pos == std::string::npos) continue; - - auto key = token.substr(0, pos); - auto val = token.substr(pos + 1); - - spdlog::info("{}: {}", key, val); - headers[key] = val; - } - - return headers; - } - - void WebSocketConnect::stop() - { - { - Bench bench("ws_connect: stop connection"); - _webSocket.stop(); - } - } - - void WebSocketConnect::start() - { - _webSocket.setUrl(_url); - _webSocket.setExtraHeaders(_headers); - - if (_disablePerMessageDeflate) - { - _webSocket.disablePerMessageDeflate(); - } - else - { - ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( - true, false, false, 15, 15); - _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); - } - - std::stringstream ss; - log(std::string("Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("ws_connect: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "ws_connect: connection closed:"; - ss << " code " << msg->closeInfo.code; - ss << " reason " << msg->closeInfo.reason << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("Received {} bytes", msg->wireSize); - - ss << "ws_connect: received message: " << msg->str; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - spdlog::info("Received message fragment"); - } - else if (msg->type == ix::WebSocketMessageType::Ping) - { - spdlog::info("Received ping"); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - spdlog::info("Received pong {}", msg->str); - } - else - { - ss << "Invalid ix::WebSocketMessageType"; - log(ss.str()); - } - }); - - _webSocket.start(); - } - - void WebSocketConnect::sendMessage(const std::string& text) - { - if (_binaryMode) - { - _webSocket.sendBinary(text); - } - else - { - _webSocket.sendText(text); - } - } - - int ws_connect_main(const std::string& url, - const std::string& headers, - bool disableAutomaticReconnection, - bool disablePerMessageDeflate, - bool binaryMode, - uint32_t maxWaitBetweenReconnectionRetries, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs) - { - std::cout << "Type Ctrl-D to exit prompt..." << std::endl; - WebSocketConnect webSocketChat(url, - headers, - disableAutomaticReconnection, - disablePerMessageDeflate, - binaryMode, - maxWaitBetweenReconnectionRetries, - tlsOptions, - subprotocol, - pingIntervalSecs); - webSocketChat.start(); - - while (true) - { - // Read line - std::string line; - auto quit = linenoise::Readline("> ", line); - - if (quit) - { - break; - } - - if (line == "/stop") - { - spdlog::info("Stopping connection..."); - webSocketChat.stop(); - continue; - } - - if (line == "/start") - { - spdlog::info("Starting connection..."); - webSocketChat.start(); - continue; - } - - webSocketChat.sendMessage(line); - - // Add text to history - linenoise::AddHistory(line.c_str()); - } - - spdlog::info(""); - webSocketChat.stop(); - - spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes()); - spdlog::info("Sent {} bytes", webSocketChat.getSentBytes()); - - return 0; - } -} // namespace ix diff --git a/ws/ws_dns_lookup.cpp b/ws/ws_dns_lookup.cpp deleted file mode 100644 index e0bb3d5c..00000000 --- a/ws/ws_dns_lookup.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ws_dns_lookup.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include - - -namespace ix -{ - int ws_dns_lookup(const std::string& hostname) - { - auto dnsLookup = std::make_shared(hostname, 80); - - std::string errMsg; - struct addrinfo* res; - - res = dnsLookup->resolve(errMsg, [] { return false; }); - - auto addr = res->ai_addr; - - char str[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &addr, str, INET_ADDRSTRLEN); - - spdlog::info("host: {} ip: {}", hostname, str); - - return 0; - } -} // namespace ix diff --git a/ws/ws_echo_client.cpp b/ws/ws_echo_client.cpp deleted file mode 100644 index e481045d..00000000 --- a/ws/ws_echo_client.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ws_echo_client.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_echo_client(const std::string& url, - bool disablePerMessageDeflate, - bool binaryMode, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs, - const std::string& sendMsg, - bool noSend) - { - // Our websocket object - ix::WebSocket webSocket; - - webSocket.setUrl(url); - webSocket.setTLSOptions(tlsOptions); - webSocket.setPingInterval(pingIntervalSecs); - - if (disablePerMessageDeflate) - { - webSocket.disablePerMessageDeflate(); - } - - if (!subprotocol.empty()) - { - webSocket.addSubProtocol(subprotocol); - } - - std::atomic receivedCount(0); - uint64_t receivedCountTotal(0); - uint64_t receivedCountPerSecs(0); - - // Setup a callback to be fired (in a background thread, watch out for race conditions !) - // when a message or an event (open, close, error) is received - webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode]( - const ix::WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Message) - { - if (!noSend) - { - webSocket.send(msg->str, msg->binary); - } - receivedCount++; - } - else if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("ws_echo_client: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - webSocket.send(sendMsg, binaryMode); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - spdlog::info("Received pong {}", msg->str); - } - }); - - auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { - setThreadName("Timer"); - while (true) - { - // - // We cannot write to sentCount and receivedCount - // as those are used externally, so we need to introduce - // our own counters - // - std::stringstream ss; - ss << "messages received: " << receivedCountPerSecs << " per second " - << receivedCountTotal << " total"; - - CoreLogger::info(ss.str()); - - receivedCountPerSecs = receivedCount - receivedCountTotal; - receivedCountTotal += receivedCountPerSecs; - - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t1(timer); - - // Now that our callback is setup, we can start our background thread and receive messages - std::cout << "Connecting to " << url << "..." << std::endl; - webSocket.start(); - - // Send a message to the server (default to TEXT mode) - webSocket.send("hello world"); - - while (true) - { - std::string text; - std::cout << "> " << std::flush; - std::getline(std::cin, text); - - webSocket.send(text); - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_echo_server.cpp b/ws/ws_echo_server.cpp deleted file mode 100644 index 8c336435..00000000 --- a/ws/ws_echo_server.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ws_echo_server.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include - -namespace ix -{ - int ws_echo_server_main(int port, - bool greetings, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions, - bool ipv6, - bool disablePerMessageDeflate, - bool disablePong) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::WebSocketServer server(port, - hostname, - SocketServer::kDefaultTcpBacklog, - SocketServer::kDefaultMaxConnections, - WebSocketServer::kDefaultHandShakeTimeoutSecs, - (ipv6) ? AF_INET6 : AF_INET); - - server.setTLSOptions(tlsOptions); - - if (disablePerMessageDeflate) - { - spdlog::info("Disable per message deflate"); - server.disablePerMessageDeflate(); - } - - if (disablePong) - { - spdlog::info("Disable responding to PING messages with PONG"); - server.disablePong(); - } - - server.setOnClientMessageCallback( - [greetings](std::shared_ptr connectionState, - ConnectionInfo& connectionInfo, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { - auto remoteIp = connectionInfo.remoteIp; - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("New connection"); - spdlog::info("remote ip: {}", remoteIp); - spdlog::info("id: {}", connectionState->getId()); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - if (greetings) - { - webSocket.sendText("Welcome !"); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("Closed connection: client id {} code {} reason {}", - connectionState->getId(), - msg->closeInfo.code, - msg->closeInfo.reason); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - spdlog::error("Connection error: {}", msg->errorInfo.reason); - spdlog::error("#retries: {}", msg->errorInfo.retries); - spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); - spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("Received {} bytes", msg->wireSize); - webSocket.send(msg->str, msg->binary); - } - }); - - auto res = server.listen(); - if (!res.first) - { - spdlog::error(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_http_client.cpp b/ws/ws_http_client.cpp deleted file mode 100644 index c5e57dba..00000000 --- a/ws/ws_http_client.cpp +++ /dev/null @@ -1,185 +0,0 @@ -/* - * http_client.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include - -namespace ix -{ - std::string extractFilename(const std::string& path) - { - std::string::size_type idx; - - idx = path.rfind('/'); - if (idx != std::string::npos) - { - std::string filename = path.substr(idx + 1); - return filename; - } - else - { - return path; - } - } - - WebSocketHttpHeaders parseHeaders(const std::string& data) - { - WebSocketHttpHeaders headers; - - // Split by \n - std::string token; - std::stringstream tokenStream(data); - - while (std::getline(tokenStream, token)) - { - std::size_t pos = token.rfind(':'); - - // Bail out if last '.' is found - if (pos == std::string::npos) continue; - - auto key = token.substr(0, pos); - auto val = token.substr(pos + 1); - - spdlog::info("{}: {}", key, val); - headers[key] = val; - } - - return headers; - } - - // - // Useful endpoint to test HTTP post - // https://postman-echo.com/post - // - HttpParameters parsePostParameters(const std::string& data) - { - HttpParameters httpParameters; - - // Split by \n - std::string token; - std::stringstream tokenStream(data); - - while (std::getline(tokenStream, token)) - { - std::size_t pos = token.rfind('='); - - // Bail out if last '.' is found - if (pos == std::string::npos) continue; - - auto key = token.substr(0, pos); - auto val = token.substr(pos + 1); - - spdlog::info("{}: {}", key, val); - httpParameters[key] = val; - } - - return httpParameters; - } - - int ws_http_client_main(const std::string& url, - const std::string& headersData, - const std::string& data, - bool headersOnly, - int connectTimeout, - int transferTimeout, - bool followRedirects, - int maxRedirects, - bool verbose, - bool save, - const std::string& output, - bool compress, - const ix::SocketTLSOptions& tlsOptions) - { - HttpClient httpClient; - httpClient.setTLSOptions(tlsOptions); - - auto args = httpClient.createRequest(); - args->extraHeaders = parseHeaders(headersData); - args->connectTimeout = connectTimeout; - args->transferTimeout = transferTimeout; - args->followRedirects = followRedirects; - args->maxRedirects = maxRedirects; - args->verbose = verbose; - args->compress = compress; - args->logger = [](const std::string& msg) { spdlog::info(msg); }; - args->onProgressCallback = [verbose](int current, int total) -> bool { - if (verbose) - { - spdlog::info("Downloaded {} bytes out of {}", current, total); - } - return true; - }; - - HttpParameters httpParameters = parsePostParameters(data); - - HttpResponsePtr response; - if (headersOnly) - { - response = httpClient.head(url, args); - } - else if (data.empty()) - { - response = httpClient.get(url, args); - } - else - { - response = httpClient.post(url, httpParameters, args); - } - - spdlog::info(""); - - for (auto it : response->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - spdlog::info("Upload size: {}", response->uploadSize); - spdlog::info("Download size: {}", response->downloadSize); - - spdlog::info("Status: {}", response->statusCode); - if (response->errorCode != HttpErrorCode::Ok) - { - spdlog::info("error message: ", response->errorMsg); - } - - if (!headersOnly && response->errorCode == HttpErrorCode::Ok) - { - if (save || !output.empty()) - { - // FIMXE we should decode the url first - std::string filename = extractFilename(url); - if (!output.empty()) - { - filename = output; - } - - spdlog::info("Writing to disk: {}", filename); - std::ofstream out(filename); - out.write((char*) &response->payload.front(), response->payload.size()); - out.close(); - } - else - { - if (response->headers["Content-Type"] != "application/octet-stream") - { - spdlog::info("payload: {}", response->payload); - } - else - { - spdlog::info("Binary output can mess up your terminal."); - spdlog::info("Use the -O flag to save the file to disk."); - spdlog::info("You can also use the --output option to specify a filename."); - } - } - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_httpd.cpp b/ws/ws_httpd.cpp deleted file mode 100644 index f99abeba..00000000 --- a/ws/ws_httpd.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ws_httpd.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_httpd_main(int port, - const std::string& hostname, - bool redirect, - const std::string& redirectUrl, - const ix::SocketTLSOptions& tlsOptions) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::HttpServer server(port, hostname); - server.setTLSOptions(tlsOptions); - - if (redirect) - { - server.makeRedirectServer(redirectUrl); - } - - auto res = server.listen(); - if (!res.first) - { - spdlog::error(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_ping_pong.cpp b/ws/ws_ping_pong.cpp deleted file mode 100644 index 44812441..00000000 --- a/ws/ws_ping_pong.cpp +++ /dev/null @@ -1,161 +0,0 @@ -/* - * ws_ping_pong.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include - -namespace ix -{ - class WebSocketPingPong - { - public: - WebSocketPingPong(const std::string& _url, const ix::SocketTLSOptions& tlsOptions); - - void subscribe(const std::string& channel); - void start(); - void stop(); - - void ping(const std::string& text); - void send(const std::string& text); - - private: - std::string _url; - ix::WebSocket _webSocket; - - void log(const std::string& msg); - }; - - WebSocketPingPong::WebSocketPingPong(const std::string& url, - const ix::SocketTLSOptions& tlsOptions) - : _url(url) - { - _webSocket.setTLSOptions(tlsOptions); - } - - void WebSocketPingPong::log(const std::string& msg) - { - spdlog::info(msg); - } - - void WebSocketPingPong::stop() - { - _webSocket.stop(); - } - - void WebSocketPingPong::start() - { - _webSocket.setUrl(_url); - - std::stringstream ss; - log(std::string("Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { - spdlog::info("Received {} bytes", msg->wireSize); - - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - log("ping_pong: connected"); - - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "ping_pong: disconnected:" - << " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason - << msg->str; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - ss << "ping_pong: received message: " << msg->str; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Ping) - { - ss << "ping_pong: received ping message: " << msg->str; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - ss << "ping_pong: received pong message: " << msg->str; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - log(ss.str()); - } - else - { - ss << "Invalid ix::WebSocketMessageType"; - log(ss.str()); - } - }); - - _webSocket.start(); - } - - void WebSocketPingPong::ping(const std::string& text) - { - if (!_webSocket.ping(text).success) - { - std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint " - "is disconnected" - << std::endl; - } - } - - void WebSocketPingPong::send(const std::string& text) - { - _webSocket.send(text); - } - - int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions) - { - spdlog::info("Type Ctrl-D to exit prompt..."); - WebSocketPingPong webSocketPingPong(url, tlsOptions); - webSocketPingPong.start(); - - while (true) - { - std::string text; - std::cout << "> " << std::flush; - std::getline(std::cin, text); - - if (!std::cin) - { - break; - } - - if (text == "/close") - { - webSocketPingPong.send(text); - } - else - { - webSocketPingPong.ping(text); - } - } - - std::cout << std::endl; - webSocketPingPong.stop(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_push_server.cpp b/ws/ws_push_server.cpp deleted file mode 100644 index 7b015b52..00000000 --- a/ws/ws_push_server.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * ws_push_server.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include - -namespace ix -{ - int ws_push_server(int port, - bool greetings, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions, - bool ipv6, - bool disablePerMessageDeflate, - bool disablePong, - const std::string& sendMsg) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::WebSocketServer server(port, - hostname, - SocketServer::kDefaultTcpBacklog, - SocketServer::kDefaultMaxConnections, - WebSocketServer::kDefaultHandShakeTimeoutSecs, - (ipv6) ? AF_INET6 : AF_INET); - - server.setTLSOptions(tlsOptions); - - if (disablePerMessageDeflate) - { - spdlog::info("Disable per message deflate"); - server.disablePerMessageDeflate(); - } - - if (disablePong) - { - spdlog::info("Disable responding to PING messages with PONG"); - server.disablePong(); - } - - server.setOnClientMessageCallback( - [greetings, &sendMsg](std::shared_ptr connectionState, - ConnectionInfo& connectionInfo, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { - auto remoteIp = connectionInfo.remoteIp; - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("New connection"); - spdlog::info("remote ip: {}", remoteIp); - spdlog::info("id: {}", connectionState->getId()); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - if (greetings) - { - webSocket.sendText("Welcome !"); - } - - bool binary = false; - while (true) - { - webSocket.send(sendMsg, binary); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("Closed connection: client id {} code {} reason {}", - connectionState->getId(), - msg->closeInfo.code, - msg->closeInfo.reason); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - spdlog::error("Connection error: {}", msg->errorInfo.reason); - spdlog::error("#retries: {}", msg->errorInfo.retries); - spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); - spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("Received {} bytes", msg->wireSize); - webSocket.send(msg->str, msg->binary); - } - }); - - auto res = server.listen(); - if (!res.first) - { - spdlog::error(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_receive.cpp b/ws/ws_receive.cpp deleted file mode 100644 index fc9c438b..00000000 --- a/ws/ws_receive.cpp +++ /dev/null @@ -1,282 +0,0 @@ -/* - * ws_receiver.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using msgpack11::MsgPack; - -namespace ix -{ - class WebSocketReceiver - { - public: - WebSocketReceiver(const std::string& _url, - bool enablePerMessageDeflate, - int delayMs, - const ix::SocketTLSOptions& tlsOptions); - - void subscribe(const std::string& channel); - void start(); - void stop(); - - void waitForConnection(); - void waitForMessage(); - void handleMessage(const std::string& str); - - private: - std::string _url; - std::string _id; - ix::WebSocket _webSocket; - bool _enablePerMessageDeflate; - int _delayMs; - int _receivedFragmentCounter; - - std::mutex _conditionVariableMutex; - std::condition_variable _condition; - - std::string extractFilename(const std::string& path); - void handleError(const std::string& errMsg, const std::string& id); - void log(const std::string& msg); - }; - - WebSocketReceiver::WebSocketReceiver(const std::string& url, - bool enablePerMessageDeflate, - int delayMs, - const ix::SocketTLSOptions& tlsOptions) - : _url(url) - , _enablePerMessageDeflate(enablePerMessageDeflate) - , _delayMs(delayMs) - , _receivedFragmentCounter(0) - { - _webSocket.disableAutomaticReconnection(); - _webSocket.setTLSOptions(tlsOptions); - } - - void WebSocketReceiver::stop() - { - _webSocket.stop(); - } - - void WebSocketReceiver::log(const std::string& msg) - { - spdlog::info(msg); - } - - void WebSocketReceiver::waitForConnection() - { - spdlog::info("{}: Connecting...", "ws_receive"); - - std::unique_lock lock(_conditionVariableMutex); - _condition.wait(lock); - } - - void WebSocketReceiver::waitForMessage() - { - spdlog::info("{}: Waiting for message...", "ws_receive"); - - std::unique_lock lock(_conditionVariableMutex); - _condition.wait(lock); - } - - // We should cleanup the file name and full path further to remove .. as well - std::string WebSocketReceiver::extractFilename(const std::string& path) - { - std::string::size_type idx; - - idx = path.rfind('/'); - if (idx != std::string::npos) - { - std::string filename = path.substr(idx + 1); - return filename; - } - else - { - return path; - } - } - - void WebSocketReceiver::handleError(const std::string& errMsg, const std::string& id) - { - std::map pdu; - pdu["kind"] = "error"; - pdu["id"] = id; - pdu["message"] = errMsg; - - MsgPack msg(pdu); - _webSocket.sendBinary(msg.dump()); - } - - void WebSocketReceiver::handleMessage(const std::string& str) - { - spdlog::info("ws_receive: Received message: {}", str.size()); - - std::string errMsg; - MsgPack data = MsgPack::parse(str, errMsg); - if (!errMsg.empty()) - { - handleError("ws_receive: Invalid MsgPack", std::string()); - return; - } - - spdlog::info("id: {}", data["id"].string_value()); - - std::vector content = data["content"].binary_items(); - spdlog::info("ws_receive: Content size: {}", content.size()); - - // Validate checksum - uint64_t cksum = ix::djb2Hash(content); - auto cksumRef = data["djb2_hash"].string_value(); - - spdlog::info("ws_receive: Computed hash: {}", cksum); - spdlog::info("ws_receive: Reference hash: {}", cksumRef); - - if (std::to_string(cksum) != cksumRef) - { - handleError("Hash mismatch.", std::string()); - return; - } - - std::string filename = data["filename"].string_value(); - filename = extractFilename(filename); - - std::string filenameTmp = filename + ".tmp"; - - spdlog::info("ws_receive: Writing to disk: {}", filenameTmp); - std::ofstream out(filenameTmp); - out.write((char*) &content.front(), content.size()); - out.close(); - - spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename); - rename(filenameTmp.c_str(), filename.c_str()); - - std::map pdu; - pdu["ack"] = true; - pdu["id"] = data["id"]; - pdu["filename"] = data["filename"]; - - spdlog::info("Sending ack to sender"); - MsgPack msg(pdu); - _webSocket.sendBinary(msg.dump()); - } - - void WebSocketReceiver::start() - { - _webSocket.setUrl(_url); - ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( - _enablePerMessageDeflate, false, false, 15, 15); - _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); - - std::stringstream ss; - log(std::string("ws_receive: Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - _condition.notify_one(); - - log("ws_receive: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "ws_receive: connection closed:"; - ss << " code " << msg->closeInfo.code; - ss << " reason " << msg->closeInfo.reason << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - ss << "ws_receive: transfered " << msg->wireSize << " bytes"; - log(ss.str()); - handleMessage(msg->str); - _condition.notify_one(); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - ss << "ws_receive: received fragment " << _receivedFragmentCounter++; - log(ss.str()); - - if (_delayMs > 0) - { - // Introduce an arbitrary delay, to simulate a slow connection - std::chrono::duration duration(_delayMs); - std::this_thread::sleep_for(duration); - } - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "ws_receive "; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Ping) - { - log("ws_receive: received ping"); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - log("ws_receive: received pong"); - } - else - { - ss << "ws_receive: Invalid ix::WebSocketMessageType"; - log(ss.str()); - } - }); - - _webSocket.start(); - } - - void wsReceive(const std::string& url, - bool enablePerMessageDeflate, - int delayMs, - const ix::SocketTLSOptions& tlsOptions) - { - WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs, tlsOptions); - webSocketReceiver.start(); - - webSocketReceiver.waitForConnection(); - - webSocketReceiver.waitForMessage(); - - std::chrono::duration duration(1000); - std::this_thread::sleep_for(duration); - - spdlog::info("ws_receive: Done !"); - webSocketReceiver.stop(); - } - - int ws_receive_main(const std::string& url, - bool enablePerMessageDeflate, - int delayMs, - const ix::SocketTLSOptions& tlsOptions) - { - wsReceive(url, enablePerMessageDeflate, delayMs, tlsOptions); - return 0; - } -} // namespace ix diff --git a/ws/ws_redis_cli.cpp b/ws/ws_redis_cli.cpp deleted file mode 100644 index efb206c9..00000000 --- a/ws/ws_redis_cli.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * ws_redis_cli.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include "linenoise.hpp" -#include -#include -#include -#include - -namespace ix -{ - int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password) - { - RedisClient redisClient; - if (!redisClient.connect(hostname, port)) - { - spdlog::info("Cannot connect to redis host"); - return 1; - } - - if (!password.empty()) - { - std::string authResponse; - if (!redisClient.auth(password, authResponse)) - { - std::stringstream ss; - spdlog::info("Cannot authenticated to redis"); - return 1; - } - spdlog::info("Auth response: {}", authResponse); - } - - while (true) - { - // Read line - std::string line; - std::string prompt; - prompt += hostname; - prompt += ":"; - prompt += std::to_string(port); - prompt += "> "; - auto quit = linenoise::Readline(prompt.c_str(), line); - - if (quit) - { - break; - } - - std::stringstream ss(line); - std::vector args; - std::string arg; - - while (ss.good()) - { - ss >> arg; - args.push_back(arg); - } - - std::string errMsg; - auto response = redisClient.send(args, errMsg); - if (!errMsg.empty()) - { - spdlog::error("(error) {}", errMsg); - } - else - { - if (response.first != RespType::String) - { - std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")" - << " "; - } - - std::cout << response.second << std::endl; - } - - linenoise::AddHistory(line.c_str()); - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_redis_publish.cpp b/ws/ws_redis_publish.cpp deleted file mode 100644 index ae0fc808..00000000 --- a/ws/ws_redis_publish.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * ws_redis_publish.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include - -namespace ix -{ - int ws_redis_publish_main(const std::string& hostname, - int port, - const std::string& password, - const std::string& channel, - const std::string& message, - int count) - { - RedisClient redisClient; - if (!redisClient.connect(hostname, port)) - { - spdlog::info("Cannot connect to redis host"); - return 1; - } - - if (!password.empty()) - { - std::string authResponse; - if (!redisClient.auth(password, authResponse)) - { - std::stringstream ss; - spdlog::info("Cannot authenticated to redis"); - return 1; - } - spdlog::info("Auth response: {}", authResponse); - } - - std::string errMsg; - for (int i = 0; i < count; i++) - { - if (!redisClient.publish(channel, message, errMsg)) - { - spdlog::error("Error publishing to channel {} error {}", channel, errMsg); - return 1; - } - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_redis_server.cpp b/ws/ws_redis_server.cpp deleted file mode 100644 index 09f02c8c..00000000 --- a/ws/ws_redis_server.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ws_redis_publish.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include - -namespace ix -{ - int ws_redis_server_main(int port, const std::string& hostname) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::RedisServer server(port, hostname); - - auto res = server.listen(); - if (!res.first) - { - spdlog::info(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix diff --git a/ws/ws_redis_subscribe.cpp b/ws/ws_redis_subscribe.cpp deleted file mode 100644 index 52311a9e..00000000 --- a/ws/ws_redis_subscribe.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ws_redis_subscribe.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_redis_subscribe_main(const std::string& hostname, - int port, - const std::string& password, - const std::string& channel, - bool verbose) - { - RedisClient redisClient; - if (!redisClient.connect(hostname, port)) - { - spdlog::info("Cannot connect to redis host"); - return 1; - } - - if (!password.empty()) - { - std::string authResponse; - if (!redisClient.auth(password, authResponse)) - { - std::stringstream ss; - spdlog::info("Cannot authenticated to redis"); - return 1; - } - spdlog::info("Auth response: {}", authResponse); - } - - std::atomic msgPerSeconds(0); - std::atomic msgCount(0); - - auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) { - if (verbose) - { - spdlog::info("recived: {}", message); - } - - msgPerSeconds++; - msgCount++; - }; - - auto responseCallback = [](const std::string& redisResponse) { - spdlog::info("Redis subscribe response: {}", redisResponse); - }; - - auto timer = [&msgPerSeconds, &msgCount] { - while (true) - { - spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); - - msgPerSeconds = 0; - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t(timer); - - spdlog::info("Subscribing to {} ...", channel); - if (!redisClient.subscribe(channel, responseCallback, callback)) - { - spdlog::info("Error subscribing to channel {}", channel); - return 1; - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_send.cpp b/ws/ws_send.cpp deleted file mode 100644 index 25bb3ab2..00000000 --- a/ws/ws_send.cpp +++ /dev/null @@ -1,311 +0,0 @@ -/* - * ws_send.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. - */ - -#include "IXBench.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using msgpack11::MsgPack; - -namespace ix -{ - class WebSocketSender - { - public: - WebSocketSender(const std::string& _url, - bool enablePerMessageDeflate, - const ix::SocketTLSOptions& tlsOptions); - - void subscribe(const std::string& channel); - void start(); - void stop(); - - void waitForConnection(); - void waitForAck(); - - bool sendMessage(const std::string& filename, bool throttle); - - private: - std::string _url; - std::string _id; - ix::WebSocket _webSocket; - bool _enablePerMessageDeflate; - - std::atomic _connected; - - std::mutex _conditionVariableMutex; - std::condition_variable _condition; - - void log(const std::string& msg); - }; - - WebSocketSender::WebSocketSender(const std::string& url, - bool enablePerMessageDeflate, - const ix::SocketTLSOptions& tlsOptions) - : _url(url) - , _enablePerMessageDeflate(enablePerMessageDeflate) - , _connected(false) - { - _webSocket.disableAutomaticReconnection(); - _webSocket.setTLSOptions(tlsOptions); - } - - void WebSocketSender::stop() - { - _webSocket.stop(); - } - - void WebSocketSender::log(const std::string& msg) - { - spdlog::info(msg); - } - - void WebSocketSender::waitForConnection() - { - spdlog::info("{}: Connecting...", "ws_send"); - - std::unique_lock lock(_conditionVariableMutex); - _condition.wait(lock); - } - - void WebSocketSender::waitForAck() - { - spdlog::info("{}: Waiting for ack...", "ws_send"); - - std::unique_lock lock(_conditionVariableMutex); - _condition.wait(lock); - } - - std::vector load(const std::string& path) - { - std::vector memblock; - - std::ifstream file(path); - if (!file.is_open()) return memblock; - - file.seekg(0, file.end); - std::streamoff size = file.tellg(); - file.seekg(0, file.beg); - - memblock.resize((size_t) size); - file.read((char*) &memblock.front(), static_cast(size)); - - return memblock; - } - - void WebSocketSender::start() - { - _webSocket.setUrl(_url); - - ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( - _enablePerMessageDeflate, false, false, 15, 15); - _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); - - std::stringstream ss; - log(std::string("ws_send: Connecting to url: ") + _url); - - _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { - std::stringstream ss; - if (msg->type == ix::WebSocketMessageType::Open) - { - _connected = true; - - _condition.notify_one(); - - log("ws_send: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - _connected = false; - - ss << "ws_send: connection closed:"; - ss << " code " << msg->closeInfo.code; - ss << " reason " << msg->closeInfo.reason << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - _condition.notify_one(); - - ss << "ws_send: received message (" << msg->wireSize << " bytes)"; - log(ss.str()); - - std::string errMsg; - MsgPack data = MsgPack::parse(msg->str, errMsg); - if (!errMsg.empty()) - { - spdlog::info("Invalid MsgPack response"); - return; - } - - std::string id = data["id"].string_value(); - if (_id != id) - { - spdlog::info("Invalid id"); - } - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - ss << "ws_send "; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - log(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Ping) - { - spdlog::info("ws_send: received ping"); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - spdlog::info("ws_send: received pong"); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - spdlog::info("ws_send: received fragment"); - } - else - { - ss << "ws_send: Invalid ix::WebSocketMessageType"; - log(ss.str()); - } - }); - - _webSocket.start(); - } - - bool WebSocketSender::sendMessage(const std::string& filename, bool throttle) - { - std::vector content; - { - Bench bench("ws_send: load file from disk"); - content = load(filename); - } - - _id = uuid4(); - - std::map pdu; - pdu["kind"] = "send"; - pdu["id"] = _id; - pdu["content"] = content; - auto hash = djb2Hash(content); - pdu["djb2_hash"] = std::to_string(hash); - pdu["filename"] = filename; - - MsgPack msg(pdu); - - auto serializedMsg = msg.dump(); - spdlog::info("ws_send: sending {} bytes", serializedMsg.size()); - - Bench bench("ws_send: Sending file through websocket"); - auto result = - _webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool { - spdlog::info("ws_send: Step {} out of {}", current + 1, total); - - if (throttle) - { - std::chrono::duration duration(10); - std::this_thread::sleep_for(duration); - } - - return _connected; - }); - - if (!result.success) - { - spdlog::error("ws_send: Error sending file."); - return false; - } - - if (!_connected) - { - spdlog::error("ws_send: Got disconnected from the server"); - return false; - } - - spdlog::info("ws_send: sent {} bytes", serializedMsg.size()); - - do - { - size_t bufferedAmount = _webSocket.bufferedAmount(); - spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount); - - std::chrono::duration duration(500); - std::this_thread::sleep_for(duration); - } while (_webSocket.bufferedAmount() != 0 && _connected); - - if (_connected) - { - bench.report(); - auto duration = bench.getDuration(); - auto transferRate = 1000 * content.size() / duration; - transferRate /= (1024 * 1024); - spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate); - } - else - { - spdlog::error("ws_send: Got disconnected from the server"); - } - - return _connected; - } - - void wsSend(const std::string& url, - const std::string& path, - bool enablePerMessageDeflate, - bool throttle, - const ix::SocketTLSOptions& tlsOptions) - { - WebSocketSender webSocketSender(url, enablePerMessageDeflate, tlsOptions); - webSocketSender.start(); - - webSocketSender.waitForConnection(); - - spdlog::info("ws_send: Sending..."); - if (webSocketSender.sendMessage(path, throttle)) - { - webSocketSender.waitForAck(); - spdlog::info("ws_send: Done !"); - } - else - { - spdlog::error("ws_send: Error sending file."); - } - - webSocketSender.stop(); - } - - int ws_send_main(const std::string& url, - const std::string& path, - bool disablePerMessageDeflate, - const ix::SocketTLSOptions& tlsOptions) - { - bool throttle = false; - bool enablePerMessageDeflate = !disablePerMessageDeflate; - - wsSend(url, path, enablePerMessageDeflate, throttle, tlsOptions); - return 0; - } -} // namespace ix diff --git a/ws/ws_sentry_minidump_upload.cpp b/ws/ws_sentry_minidump_upload.cpp deleted file mode 100644 index a5de4ed7..00000000 --- a/ws/ws_sentry_minidump_upload.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - * ws_sentry_minidump_upload.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include - - -namespace -{ - // Assume the file exists - std::string readBytes(const std::string& path) - { - std::vector memblock; - std::ifstream file(path); - - file.seekg(0, file.end); - std::streamoff size = file.tellg(); - file.seekg(0, file.beg); - - memblock.resize(size); - - file.read((char*) &memblock.front(), static_cast(size)); - - std::string bytes(memblock.begin(), memblock.end()); - return bytes; - } -} // namespace - -namespace ix -{ - int ws_sentry_minidump_upload(const std::string& metadataPath, - const std::string& minidump, - const std::string& project, - const std::string& key, - bool verbose) - { - SentryClient sentryClient((std::string())); - - // Read minidump file from disk - std::string minidumpBytes = readBytes(minidump); - - // Read json data - std::string sentryMetadata = readBytes(metadataPath); - - std::atomic done(false); - - sentryClient.uploadMinidump( - sentryMetadata, - minidumpBytes, - project, - key, - verbose, - [verbose, &done](const HttpResponsePtr& response) { - if (verbose) - { - for (auto it : response->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - spdlog::info("Upload size: {}", response->uploadSize); - spdlog::info("Download size: {}", response->downloadSize); - - spdlog::info("Status: {}", response->statusCode); - if (response->errorCode != HttpErrorCode::Ok) - { - spdlog::info("error message: {}", response->errorMsg); - } - - if (response->headers["Content-Type"] != "application/octet-stream") - { - spdlog::info("payload: {}", response->payload); - } - } - - if (response->statusCode != 200) - { - spdlog::error("Error sending data to sentry: {}", response->statusCode); - spdlog::error("Status: {}", response->statusCode); - spdlog::error("Response: {}", response->payload); - } - else - { - spdlog::info("Event sent to sentry"); - } - - done = true; - }); - - int i = 0; - - while (!done) - { - std::chrono::duration duration(10); - std::this_thread::sleep_for(duration); - - if (i++ > 5000) break; // wait 5 seconds max - } - - if (!done) - { - spdlog::error("Error: timing out trying to sent a crash to sentry"); - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_snake.cpp b/ws/ws_snake.cpp deleted file mode 100644 index 2f761453..00000000 --- a/ws/ws_snake.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * snake_run.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include - -namespace -{ - std::vector load(const std::string& path) - { - std::vector memblock; - - std::ifstream file(path); - if (!file.is_open()) return memblock; - - file.seekg(0, file.end); - std::streamoff size = file.tellg(); - file.seekg(0, file.beg); - - memblock.resize((size_t) size); - file.read((char*) &memblock.front(), static_cast(size)); - - return memblock; - } - - std::string readAsString(const std::string& path) - { - auto vec = load(path); - return std::string(vec.begin(), vec.end()); - } -} // namespace - -namespace ix -{ - int ws_snake_main(int port, - const std::string& hostname, - const std::string& redisHosts, - int redisPort, - const std::string& redisPassword, - bool verbose, - const std::string& appsConfigPath, - const SocketTLSOptions& socketTLSOptions, - bool disablePong, - const std::string& republishChannel) - { - snake::AppConfig appConfig; - appConfig.port = port; - appConfig.hostname = hostname; - appConfig.verbose = verbose; - appConfig.redisPort = redisPort; - appConfig.redisPassword = redisPassword; - appConfig.socketTLSOptions = socketTLSOptions; - appConfig.disablePong = disablePong; - appConfig.republishChannel = republishChannel; - - // Parse config file - auto str = readAsString(appsConfigPath); - if (str.empty()) - { - spdlog::error("Cannot read content of {}", appsConfigPath); - return 1; - } - - spdlog::error(str); - auto apps = nlohmann::json::parse(str); - appConfig.apps = apps["apps"]; - - std::string token; - std::stringstream tokenStream(redisHosts); - while (std::getline(tokenStream, token, ';')) - { - appConfig.redisHosts.push_back(token); - } - - // Display config on the terminal for debugging - dumpConfig(appConfig); - - snake::SnakeServer snakeServer(appConfig); - snakeServer.runForever(); - - return 0; // should never reach this - } -} // namespace ix diff --git a/ws/ws_transfer.cpp b/ws/ws_transfer.cpp deleted file mode 100644 index 9dd1fa5f..00000000 --- a/ws/ws_transfer.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/* - * ws_transfer.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include - -namespace ix -{ - int ws_transfer_main(int port, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::WebSocketServer server(port, hostname); - server.setTLSOptions(tlsOptions); - - server.setOnClientMessageCallback( - [&server](std::shared_ptr connectionState, - ConnectionInfo& connectionInfo, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { - auto remoteIp = connectionInfo.remoteIp; - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("ws_transfer: New connection"); - spdlog::info("remote ip: {}", remoteIp); - spdlog::info("id: {}", connectionState->getId()); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}", - connectionState->getId(), - msg->closeInfo.code, - msg->closeInfo.reason); - auto remaining = server.getClients().size() - 1; - spdlog::info("ws_transfer: {} remaining clients", remaining); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - spdlog::info(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - spdlog::info("ws_transfer: Received message fragment "); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("ws_transfer: Received {} bytes", msg->wireSize); - size_t receivers = 0; - for (auto&& client : server.getClients()) - { - if (client.get() != &webSocket) - { - auto readyState = client->getReadyState(); - auto id = connectionState->getId(); - - if (readyState == ReadyState::Open) - { - ++receivers; - client->send( - msg->str, msg->binary, [&id](int current, int total) -> bool { - spdlog::info("{}: [client {}]: Step {} out of {}", - "ws_transfer", - id, - current, - total); - return true; - }); - do - { - size_t bufferedAmount = client->bufferedAmount(); - - spdlog::info("{}: [client {}]: {} bytes left to send", - "ws_transfer", - id, - bufferedAmount); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - } while (client->bufferedAmount() != 0 && - client->getReadyState() == ReadyState::Open); - } - else - { - std::string readyStateString = - readyState == ReadyState::Connecting - ? "Connecting" - : readyState == ReadyState::Closing ? "Closing" : "Closed"; - size_t bufferedAmount = client->bufferedAmount(); - - spdlog::info( - "{}: [client {}]: has readystate {} bytes left to be sent {}", - "ws_transfer", - id, - readyStateString, - bufferedAmount); - } - } - } - if (!receivers) - { - spdlog::info("ws_transfer: no remaining receivers"); - } - } - }); - - auto res = server.listen(); - if (!res.first) - { - spdlog::info(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } -} // namespace ix