From bcae7f326dcfa4d2dbb5907b8580ca0d2142cd84 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sun, 2 Aug 2020 12:09:13 -0700 Subject: [PATCH] (ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) --- docs/CHANGELOG.md | 8 ++ ixwebsocket/IXWebSocketVersion.h | 2 +- makefile | 5 +- .../python/websockets/echo_server.py | 2 +- ws/CMakeLists.txt | 1 + ws/ws.cpp | 23 ++++ ws/ws.h | 8 ++ ws/ws_connect.cpp | 2 +- ws/ws_echo_client.cpp | 113 ++++++++++++++++++ 9 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 ws/ws_echo_client.cpp diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a9873fca..5cbe5438 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,14 @@ # Changelog All changes to this project will be documented in this file. +## [10.1.4] - 2020-08-02 + +(ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) + +## [10.1.3] - 2020-08-02 + +(ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235) + ## [10.1.2] - 2020-07-31 (build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index a2b86a11..f6e7dc16 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "10.1.2" +#define IX_WEBSOCKET_VERSION "10.1.4" diff --git a/makefile b/makefile index 44e2e695..139d2fb9 100644 --- a/makefile +++ b/makefile @@ -34,7 +34,10 @@ ws: mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install) ws_install: - mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install) + mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install) + +ws_install_release: + mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install) ws_openssl: mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4) diff --git a/test/compatibility/python/websockets/echo_server.py b/test/compatibility/python/websockets/echo_server.py index 22a44cc6..e609aa8d 100644 --- a/test/compatibility/python/websockets/echo_server.py +++ b/test/compatibility/python/websockets/echo_server.py @@ -10,7 +10,7 @@ import websockets async def echo(websocket, path): while True: msg = await websocket.recv() - print(f'Received {len(msg)} bytes') + # print(f'Received {len(msg)} bytes') await websocket.send(msg) host = os.getenv('BIND_HOST', 'localhost') diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index ee6ad0ab..929aeb9c 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -51,6 +51,7 @@ add_executable(ws ws_ping_pong.cpp ws_broadcast_server.cpp ws_echo_server.cpp + ws_echo_client.cpp ws_chat.cpp ws_connect.cpp ws_transfer.cpp diff --git a/ws/ws.cpp b/ws/ws.cpp index 840d22f4..370ba8c9 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -125,6 +125,7 @@ int main(int argc, char** argv) std::string logfile; std::string scriptPath; std::string republishChannel; + std::string sendMsg("hello world"); ix::SocketTLSOptions tlsOptions; ix::CobraConfig cobraConfig; ix::CobraBotConfig cobraBotConfig; @@ -243,6 +244,18 @@ int main(int argc, char** argv) connectApp->add_option("--subprotocol", subprotocol, "Subprotocol"); addTLSOptions(connectApp); + CLI::App* echoClientApp = + app.add_subcommand("echo_client", "Echo messages sent by a remote server"); + echoClientApp->fallthrough(); + echoClientApp->add_option("url", url, "Connection url")->required(); + echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); + echoClientApp->add_flag("-b", binaryMode, "Send in binary mode"); + echoClientApp->add_option( + "--ping_interval", pingIntervalSecs, "Interval between sending pings"); + echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol"); + echoClientApp->add_option("--send_msg", sendMsg, "Send message"); + addTLSOptions(echoClientApp); + CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); chatApp->fallthrough(); chatApp->add_option("url", url, "Connection url")->required(); @@ -504,6 +517,16 @@ int main(int argc, char** argv) subprotocol, pingIntervalSecs); } + else if (app.got_subcommand("echo_client")) + { + ret = ix::ws_echo_client(url, + disablePerMessageDeflate, + binaryMode, + tlsOptions, + subprotocol, + pingIntervalSecs, + sendMsg); + } else if (app.got_subcommand("echo_server")) { ret = ix::ws_echo_server_main( diff --git a/ws/ws.h b/ws/ws.h index 4ac56f0f..70d3b745 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -54,6 +54,14 @@ namespace ix const std::string& subprotocol, int pingIntervalSecs); + int ws_echo_client(const std::string& url, + bool disablePerMessageDeflate, + bool binaryMode, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs, + const std::string& sendMsg); + int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs, diff --git a/ws/ws_connect.cpp b/ws/ws_connect.cpp index 8d177bbf..b6827322 100644 --- a/ws/ws_connect.cpp +++ b/ws/ws_connect.cpp @@ -160,7 +160,7 @@ namespace ix std::stringstream ss; if (msg->type == ix::WebSocketMessageType::Open) { - log("ws_connect: connected"); + spdlog::info("ws_connect: connected"); spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Headers:"); for (auto it : msg->openInfo.headers) diff --git a/ws/ws_echo_client.cpp b/ws/ws_echo_client.cpp new file mode 100644 index 00000000..cb169333 --- /dev/null +++ b/ws/ws_echo_client.cpp @@ -0,0 +1,113 @@ +/* + * ws_echo_client.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + int ws_echo_client(const std::string& url, + bool disablePerMessageDeflate, + bool binaryMode, + const ix::SocketTLSOptions& tlsOptions, + const std::string& subprotocol, + int pingIntervalSecs, + const std::string& sendMsg) + { + // Our websocket object + ix::WebSocket webSocket; + + webSocket.setUrl(url); + webSocket.setTLSOptions(tlsOptions); + webSocket.setPingInterval(pingIntervalSecs); + + if (disablePerMessageDeflate) + { + webSocket.disablePerMessageDeflate(); + } + + if (!subprotocol.empty()) + { + webSocket.addSubProtocol(subprotocol); + } + + std::atomic receivedCount(0); + uint64_t receivedCountTotal(0); + uint64_t receivedCountPerSecs(0); + + // Setup a callback to be fired (in a background thread, watch out for race conditions !) + // when a message or an event (open, close, error) is received + webSocket.setOnMessageCallback( + [&webSocket, &receivedCount, &sendMsg, binaryMode](const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Message) + { + webSocket.send(msg->str, msg->binary); + receivedCount++; + } + else if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("ws_echo_client: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + webSocket.send(sendMsg, binaryMode); + } + }); + + auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { + setThreadName("Timer"); + while (true) + { + // + // We cannot write to sentCount and receivedCount + // as those are used externally, so we need to introduce + // our own counters + // + std::stringstream ss; + ss << "messages received: " << receivedCountPerSecs << " per second " + << receivedCountTotal << " total"; + + CoreLogger::info(ss.str()); + + receivedCountPerSecs = receivedCount - receivedCountTotal; + receivedCountTotal += receivedCountPerSecs; + + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t1(timer); + + // Now that our callback is setup, we can start our background thread and receive messages + std::cout << "Connecting to " << url << "..." << std::endl; + webSocket.start(); + + // Send a message to the server (default to TEXT mode) + webSocket.send("hello world"); + + while (true) + { + std::string text; + std::cout << "> " << std::flush; + std::getline(std::cin, text); + + webSocket.send(text); + } + + return 0; + } +} // namespace ix