From 6e47c62c066d2b67f364ee7e9b7d7e80110582a4 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sun, 2 Aug 2020 12:41:34 -0700 Subject: [PATCH] (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). --- docs/CHANGELOG.md | 7 +- docs/performance.md | 37 +++++++++++ ixwebsocket/IXWebSocketVersion.h | 2 +- ws/CMakeLists.txt | 1 + ws/ws.cpp | 28 +++++++- ws/ws.h | 12 +++- ws/ws_echo_client.cpp | 40 +++++++----- ws/ws_push_server.cpp | 108 +++++++++++++++++++++++++++++++ 8 files changed, 215 insertions(+), 20 deletions(-) create mode 100644 docs/performance.md create mode 100644 ws/ws_push_server.cpp diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5cbe5438..c070c2a2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,9 +1,14 @@ # Changelog + All changes to this project will be documented in this file. +## [10.1.5] - 2020-08-02 + +(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). + ## [10.1.4] - 2020-08-02 -(ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) +(ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) ## [10.1.3] - 2020-08-02 diff --git a/docs/performance.md b/docs/performance.md new file mode 100644 index 00000000..3720bc59 --- /dev/null +++ b/docs/performance.md @@ -0,0 +1,37 @@ + +## WebSocket Client performance + +We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages. + +### Receiving messages + +By using the push_server ws sub-command, the server will send the same message in a loop to any connected client. + +``` +ws push_server -q --send_msg 'yo' +``` + +By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second. + +``` +$ ws echo_client -m ws://localhost:8008 +[2020-08-02 12:31:17.284] [info] ws_echo_client: connected +[2020-08-02 12:31:17.284] [info] Uri: / +[2020-08-02 12:31:17.284] [info] Headers: +[2020-08-02 12:31:17.284] [info] Connection: Upgrade +[2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo= +[2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11 +[2020-08-02 12:31:17.284] [info] Upgrade: websocket +[2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total +[2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total +[2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total +[2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total +[2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total +[2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total +[2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total +[2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total +[2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total +[2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total +[2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total +[2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total +``` diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index f6e7dc16..e5c4a1db 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "10.1.4" +#define IX_WEBSOCKET_VERSION "10.1.5" diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 929aeb9c..02c93e55 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -50,6 +50,7 @@ add_executable(ws ws_http_client.cpp ws_ping_pong.cpp ws_broadcast_server.cpp + ws_push_server.cpp ws_echo_server.cpp ws_echo_client.cpp ws_chat.cpp diff --git a/ws/ws.cpp b/ws/ws.cpp index 370ba8c9..da64370f 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -148,6 +148,7 @@ int main(int argc, char** argv) bool version = false; bool verifyNone = false; bool disablePong = false; + bool noSend = false; int port = 8008; int redisPort = 6379; int statsdPort = 8125; @@ -254,6 +255,7 @@ int main(int argc, char** argv) "--ping_interval", pingIntervalSecs, "Interval between sending pings"); echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol"); echoClientApp->add_option("--send_msg", sendMsg, "Send message"); + echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages"); addTLSOptions(echoClientApp); CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); @@ -272,6 +274,18 @@ int main(int argc, char** argv) echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); addTLSOptions(echoServerApp); + CLI::App* pushServerApp = app.add_subcommand("push_server", "Push server"); + pushServerApp->fallthrough(); + pushServerApp->add_option("--port", port, "Port"); + pushServerApp->add_option("--host", hostname, "Hostname"); + pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors"); + pushServerApp->add_flag("-g", greetings, "Greet"); + pushServerApp->add_flag("-6", ipv6, "IpV6"); + pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); + pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); + pushServerApp->add_option("--send_msg", sendMsg, "Send message"); + addTLSOptions(pushServerApp); + CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); broadcastServerApp->fallthrough(); broadcastServerApp->add_option("--port", port, "Port"); @@ -525,13 +539,25 @@ int main(int argc, char** argv) tlsOptions, subprotocol, pingIntervalSecs, - sendMsg); + sendMsg, + noSend); } else if (app.got_subcommand("echo_server")) { ret = ix::ws_echo_server_main( port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong); } + else if (app.got_subcommand("push_server")) + { + ret = ix::ws_push_server(port, + greetings, + hostname, + tlsOptions, + ipv6, + disablePerMessageDeflate, + disablePong, + sendMsg); + } else if (app.got_subcommand("transfer")) { ret = ix::ws_transfer_main(port, hostname, tlsOptions); diff --git a/ws/ws.h b/ws/ws.h index 70d3b745..89783ccf 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -35,6 +35,15 @@ namespace ix bool disablePerMessageDeflate, bool disablePong); + int ws_push_server(int port, + bool greetings, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions, + bool ipv6, + bool disablePerMessageDeflate, + bool disablePong, + const std::string& sendMsg); + int ws_broadcast_server_main(int port, const std::string& hostname, const ix::SocketTLSOptions& tlsOptions); @@ -60,7 +69,8 @@ namespace ix const ix::SocketTLSOptions& tlsOptions, const std::string& subprotocol, int pingIntervalSecs, - const std::string& sendMsg); + const std::string& sendMsg, + bool noSend); int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, diff --git a/ws/ws_echo_client.cpp b/ws/ws_echo_client.cpp index cb169333..e481045d 100644 --- a/ws/ws_echo_client.cpp +++ b/ws/ws_echo_client.cpp @@ -21,7 +21,8 @@ namespace ix const ix::SocketTLSOptions& tlsOptions, const std::string& subprotocol, int pingIntervalSecs, - const std::string& sendMsg) + const std::string& sendMsg, + bool noSend) { // Our websocket object ix::WebSocket webSocket; @@ -46,26 +47,33 @@ namespace ix // Setup a callback to be fired (in a background thread, watch out for race conditions !) // when a message or an event (open, close, error) is received - webSocket.setOnMessageCallback( - [&webSocket, &receivedCount, &sendMsg, binaryMode](const ix::WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Message) + webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode]( + const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Message) + { + if (!noSend) { webSocket.send(msg->str, msg->binary); - receivedCount++; } - else if (msg->type == ix::WebSocketMessageType::Open) + receivedCount++; + } + else if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("ws_echo_client: connected"); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) { - spdlog::info("ws_echo_client: connected"); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - webSocket.send(sendMsg, binaryMode); + spdlog::info("{}: {}", it.first, it.second); } - }); + + webSocket.send(sendMsg, binaryMode); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + spdlog::info("Received pong {}", msg->str); + } + }); auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { setThreadName("Timer"); diff --git a/ws/ws_push_server.cpp b/ws/ws_push_server.cpp new file mode 100644 index 00000000..7b015b52 --- /dev/null +++ b/ws/ws_push_server.cpp @@ -0,0 +1,108 @@ +/* + * ws_push_server.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include + +namespace ix +{ + int ws_push_server(int port, + bool greetings, + const std::string& hostname, + const ix::SocketTLSOptions& tlsOptions, + bool ipv6, + bool disablePerMessageDeflate, + bool disablePong, + const std::string& sendMsg) + { + spdlog::info("Listening on {}:{}", hostname, port); + + ix::WebSocketServer server(port, + hostname, + SocketServer::kDefaultTcpBacklog, + SocketServer::kDefaultMaxConnections, + WebSocketServer::kDefaultHandShakeTimeoutSecs, + (ipv6) ? AF_INET6 : AF_INET); + + server.setTLSOptions(tlsOptions); + + if (disablePerMessageDeflate) + { + spdlog::info("Disable per message deflate"); + server.disablePerMessageDeflate(); + } + + if (disablePong) + { + spdlog::info("Disable responding to PING messages with PONG"); + server.disablePong(); + } + + server.setOnClientMessageCallback( + [greetings, &sendMsg](std::shared_ptr connectionState, + ConnectionInfo& connectionInfo, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionInfo.remoteIp; + if (msg->type == ix::WebSocketMessageType::Open) + { + spdlog::info("New connection"); + spdlog::info("remote ip: {}", remoteIp); + spdlog::info("id: {}", connectionState->getId()); + spdlog::info("Uri: {}", msg->openInfo.uri); + spdlog::info("Headers:"); + for (auto it : msg->openInfo.headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + if (greetings) + { + webSocket.sendText("Welcome !"); + } + + bool binary = false; + while (true) + { + webSocket.send(sendMsg, binary); + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + spdlog::info("Closed connection: client id {} code {} reason {}", + connectionState->getId(), + msg->closeInfo.code, + msg->closeInfo.reason); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + spdlog::error("Connection error: {}", msg->errorInfo.reason); + spdlog::error("#retries: {}", msg->errorInfo.retries); + spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); + spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + spdlog::info("Received {} bytes", msg->wireSize); + webSocket.send(msg->str, msg->binary); + } + }); + + auto res = server.listen(); + if (!res.first) + { + spdlog::error(res.second); + return 1; + } + + server.start(); + server.wait(); + + return 0; + } +} // namespace ix