diff --git a/docker-compose.yml b/docker-compose.yml index ee59c7cb..0caf0cb7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,67 +1,11 @@ -version: "3" +version: "3.3" services: - # snake: - # image: bsergean/ws:build - # entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1 - # ports: - # - "8767:8767" - # networks: - # - ws-net - # depends_on: - # - redis1 + push: + entrypoint: ws push_server --host 0.0.0.0 + image: ${DOCKER_REPO}/ws:build - # proxy: - # image: bsergean/ws:build - # entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v - # ports: - # - "8765:8765" - # networks: - # - ws-net - - #pyproxy: - # image: bsergean/ws_proxy:build - # entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 - # ports: - # - "8765:8765" - # networks: - # - ws-net - - # # ws: - # # security_opt: - # # - seccomp:unconfined - # # cap_add: - # # - SYS_PTRACE - # # stdin_open: true - # # tty: true - # # image: bsergean/ws:build - # # entrypoint: sh - # # networks: - # # - ws-net - # # depends_on: - # # - redis1 - # # - # # redis1: - # # image: redis:alpine - # # networks: - # # - ws-net - # # - # # statsd: - # # image: jaconel/statsd - # # ports: - # # - "8125:8125" - # # environment: - # # - STATSD_DUMP_MSG=true - # # - GRAPHITE_HOST=127.0.0.1 - # # networks: - # # - ws-net - - compile: - image: alpine - entrypoint: sh - stdin_open: true - tty: true - volumes: - - /Users/bsergeant/src/foss:/home/bsergean/src/foss - -networks: - ws-net: + autoroute: + entrypoint: ws autoroute ws://push:8008 + image: ${DOCKER_REPO}/ws:build + depends_on: + - push diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bf873482..f9f4ace2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,10 @@ All changes to this project will be documented in this file. +## [10.3.3] - 2020-09-02 + +(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent. + ## [10.3.2] - 2020-08-31 (ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel diff --git a/ixwebsocket/IXBench.cpp b/ixwebsocket/IXBench.cpp index 64ca2eaf..d29f54f4 100644 --- a/ixwebsocket/IXBench.cpp +++ b/ixwebsocket/IXBench.cpp @@ -12,10 +12,8 @@ namespace ix { Bench::Bench(const std::string& description) : _description(description) - , _start(std::chrono::high_resolution_clock::now()) - , _reported(false) { - ; + reset(); } Bench::~Bench() @@ -26,6 +24,12 @@ namespace ix } } + void Bench::reset() + { + _start = std::chrono::high_resolution_clock::now(); + _reported = false; + } + void Bench::report() { auto now = std::chrono::high_resolution_clock::now(); diff --git a/ixwebsocket/IXBench.h b/ixwebsocket/IXBench.h index 90c5a538..3c46e192 100644 --- a/ixwebsocket/IXBench.h +++ b/ixwebsocket/IXBench.h @@ -17,6 +17,7 @@ namespace ix Bench(const std::string& description); ~Bench(); + void reset(); void report(); uint64_t getDuration() const; diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 35902aaa..d4442159 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "10.3.2" +#define IX_WEBSOCKET_VERSION "10.3.3" diff --git a/ws/ws.cpp b/ws/ws.cpp index 2f676890..47fb5cf3 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -1103,21 +1103,26 @@ namespace ix return 0; } - int ws_echo_client(const std::string& url, - bool disablePerMessageDeflate, - bool binaryMode, - const ix::SocketTLSOptions& tlsOptions, - const std::string& subprotocol, - int pingIntervalSecs, - const std::string& sendMsg, - bool noSend) + int ws_autoroute(const std::string& url, + bool disablePerMessageDeflate, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs, + int msgCount) { + Bench bench("ws_autoroute full test"); + // Our websocket object ix::WebSocket webSocket; - webSocket.setUrl(url); + std::string fullUrl(url); + fullUrl += "/"; + fullUrl += std::to_string(msgCount); + + webSocket.setUrl(fullUrl); webSocket.setTLSOptions(tlsOptions); webSocket.setPingInterval(pingIntervalSecs); + webSocket.disableAutomaticReconnection(); if (disablePerMessageDeflate) { @@ -1129,43 +1134,52 @@ namespace ix webSocket.addSubProtocol(subprotocol); } - std::atomic receivedCount(0); - uint64_t receivedCountTotal(0); - uint64_t receivedCountPerSecs(0); + std::atomic receivedCountTotal(0); + std::atomic receivedCountPerSecs(0); + std::mutex conditionVariableMutex; + std::condition_variable condition; - // Setup a callback to be fired (in a background thread, watch out for race conditions !) - // when a message or an event (open, close, error) is received - webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode]( - const ix::WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Message) - { - if (!noSend) + std::atomic stop(false); + + // Setup a callback to be fired + // when a message or an event (open, close, ping, pong, error) is received + webSocket.setOnMessageCallback( + [&webSocket, &receivedCountPerSecs, &receivedCountTotal, &stop, &condition, &bench]( + const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Message) { - webSocket.send(msg->str, msg->binary); + receivedCountPerSecs++; + receivedCountTotal++; } - receivedCount++; - } - else if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("ws_echo_client: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) + else if (msg->type == ix::WebSocketMessageType::Open) { - spdlog::info("{}: {}", it.first, it.second); + bench.reset(); + + spdlog::info("ws_autoroute: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + spdlog::info("Received pong {}", msg->str); + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("ws_autoroute: connection closed"); + stop = true; + condition.notify_one(); - webSocket.send(sendMsg, binaryMode); - } - else if (msg->type == ix::WebSocketMessageType::Pong) - { - spdlog::info("Received pong {}", msg->str); - } - }); + bench.report(); + } + }); - auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { + auto timer = [&receivedCountTotal, &receivedCountPerSecs, &stop] { setThreadName("Timer"); - while (true) + while (!stop) { // // We cannot write to sentCount and receivedCount @@ -1178,8 +1192,7 @@ namespace ix CoreLogger::info(ss.str()); - receivedCountPerSecs = receivedCount - receivedCountTotal; - receivedCountTotal += receivedCountPerSecs; + receivedCountPerSecs = 0; auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); @@ -1192,17 +1205,17 @@ namespace ix std::cout << "Connecting to " << url << "..." << std::endl; webSocket.start(); - // Send a message to the server (default to TEXT mode) - webSocket.send("hello world"); + // Wait for all the messages to be received + std::unique_lock lock(conditionVariableMutex); + condition.wait(lock); - while (true) - { - std::string text; - std::cout << "> " << std::flush; - std::getline(std::cin, text); + t1.join(); + webSocket.stop(); - webSocket.send(text); - } + std::stringstream ss; + ss << "messages received: " << receivedCountTotal << " total"; + + CoreLogger::info(ss.str()); return 0; } @@ -1640,7 +1653,6 @@ namespace ix } int ws_push_server(int port, - bool greetings, const std::string& hostname, const ix::SocketTLSOptions& tlsOptions, bool ipv6, @@ -1671,10 +1683,13 @@ namespace ix server.disablePong(); } + // push one million messages + std::atomic stop(false); + server.setOnClientMessageCallback( - [greetings, &sendMsg](std::shared_ptr connectionState, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { + [&sendMsg, &stop](std::shared_ptr connectionState, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { auto remoteIp = connectionState->getRemoteIp(); if (msg->type == ix::WebSocketMessageType::Open) { @@ -1688,20 +1703,30 @@ namespace ix spdlog::info("{}: {}", it.first, it.second); } - if (greetings) - { - webSocket.sendText("Welcome !"); - } + // Parse the msg count from the uri. + int msgCount = -1; + std::stringstream ss; + auto uriSize = msg->openInfo.uri.size(); + ss << msg->openInfo.uri.substr(1, uriSize - 1); + ss >> msgCount; - bool binary = false; - while (true) + if (msgCount == -1) { - auto sendInfo = webSocket.send(sendMsg, binary); - if (!sendInfo.success) + spdlog::info("Error parsing message count, closing connection"); + webSocket.close(); + } + else + { + bool binary = false; + for (int i = 0; i < msgCount; ++i) { - spdlog::info("Error sending message, closing connection"); - webSocket.close(); - break; + auto sendInfo = webSocket.send(sendMsg, binary); + if (!sendInfo.success) + { + spdlog::info("Error sending message, closing connection"); + webSocket.close(); + break; + } } } } @@ -1711,6 +1736,7 @@ namespace ix connectionState->getId(), msg->closeInfo.code, msg->closeInfo.reason); + stop = true; } else if (msg->type == ix::WebSocketMessageType::Error) { @@ -1734,7 +1760,14 @@ namespace ix } server.start(); - server.wait(); + + while (!stop) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + + server.stop(); return 0; } @@ -2848,6 +2881,7 @@ int main(int argc, char** argv) int maxRedirects = 5; int delayMs = -1; int count = 1; + int msgCount = 1000 * 1000; uint32_t maxWaitBetweenReconnectionRetries; int pingIntervalSecs = 30; @@ -2941,17 +2975,14 @@ int main(int argc, char** argv) addGenericOptions(connectApp); addTLSOptions(connectApp); - CLI::App* echoClientApp = - app.add_subcommand("echo_client", "Echo messages sent by a remote server"); + CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance"); echoClientApp->fallthrough(); echoClientApp->add_option("url", url, "Connection url")->required(); echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); - echoClientApp->add_flag("-b", binaryMode, "Send in binary mode"); echoClientApp->add_option( "--ping_interval", pingIntervalSecs, "Interval between sending pings"); echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol"); - echoClientApp->add_option("--send_msg", sendMsg, "Send message"); - echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages"); + echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent"); addTLSOptions(echoClientApp); CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); @@ -2976,7 +3007,6 @@ int main(int argc, char** argv) pushServerApp->add_option("--port", port, "Port"); pushServerApp->add_option("--host", hostname, "Hostname"); pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors"); - pushServerApp->add_flag("-g", greetings, "Greet"); pushServerApp->add_flag("-6", ipv6, "IpV6"); pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); @@ -3267,16 +3297,10 @@ int main(int argc, char** argv) subprotocol, pingIntervalSecs); } - else if (app.got_subcommand("echo_client")) + else if (app.got_subcommand("autoroute")) { - ret = ix::ws_echo_client(url, - disablePerMessageDeflate, - binaryMode, - tlsOptions, - subprotocol, - pingIntervalSecs, - sendMsg, - noSend); + ret = ix::ws_autoroute( + url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount); } else if (app.got_subcommand("echo_server")) { @@ -3286,7 +3310,6 @@ int main(int argc, char** argv) else if (app.got_subcommand("push_server")) { ret = ix::ws_push_server(port, - greetings, hostname, tlsOptions, ipv6,