(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.

This commit is contained in:
Benjamin Sergeant 2020-09-03 09:13:23 -07:00
parent 5534a7fdf9
commit a40003e85a
6 changed files with 126 additions and 150 deletions

View File

@ -1,67 +1,11 @@
version: "3"
version: "3.3"
services:
# snake:
# image: bsergean/ws:build
# entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
# ports:
# - "8767:8767"
# networks:
# - ws-net
# depends_on:
# - redis1
push:
entrypoint: ws push_server --host 0.0.0.0
image: ${DOCKER_REPO}/ws:build
# proxy:
# image: bsergean/ws:build
# entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
# ports:
# - "8765:8765"
# networks:
# - ws-net
#pyproxy:
# image: bsergean/ws_proxy:build
# entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
# ports:
# - "8765:8765"
# networks:
# - ws-net
# # ws:
# # security_opt:
# # - seccomp:unconfined
# # cap_add:
# # - SYS_PTRACE
# # stdin_open: true
# # tty: true
# # image: bsergean/ws:build
# # entrypoint: sh
# # networks:
# # - ws-net
# # depends_on:
# # - redis1
# #
# # redis1:
# # image: redis:alpine
# # networks:
# # - ws-net
# #
# # statsd:
# # image: jaconel/statsd
# # ports:
# # - "8125:8125"
# # environment:
# # - STATSD_DUMP_MSG=true
# # - GRAPHITE_HOST=127.0.0.1
# # networks:
# # - ws-net
compile:
image: alpine
entrypoint: sh
stdin_open: true
tty: true
volumes:
- /Users/bsergeant/src/foss:/home/bsergean/src/foss
networks:
ws-net:
autoroute:
entrypoint: ws autoroute ws://push:8008
image: ${DOCKER_REPO}/ws:build
depends_on:
- push

View File

@ -2,6 +2,10 @@
All changes to this project will be documented in this file.
## [10.3.3] - 2020-09-02
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
## [10.3.2] - 2020-08-31
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel

View File

@ -12,10 +12,8 @@ namespace ix
{
Bench::Bench(const std::string& description)
: _description(description)
, _start(std::chrono::high_resolution_clock::now())
, _reported(false)
{
;
reset();
}
Bench::~Bench()
@ -26,6 +24,12 @@ namespace ix
}
}
void Bench::reset()
{
_start = std::chrono::high_resolution_clock::now();
_reported = false;
}
void Bench::report()
{
auto now = std::chrono::high_resolution_clock::now();

View File

@ -17,6 +17,7 @@ namespace ix
Bench(const std::string& description);
~Bench();
void reset();
void report();
uint64_t getDuration() const;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "10.3.2"
#define IX_WEBSOCKET_VERSION "10.3.3"

185
ws/ws.cpp
View File

@ -1103,21 +1103,26 @@ namespace ix
return 0;
}
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
int ws_autoroute(const std::string& url,
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
int msgCount)
{
Bench bench("ws_autoroute full test");
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
std::string fullUrl(url);
fullUrl += "/";
fullUrl += std::to_string(msgCount);
webSocket.setUrl(fullUrl);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
webSocket.disableAutomaticReconnection();
if (disablePerMessageDeflate)
{
@ -1129,43 +1134,52 @@ namespace ix
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
std::atomic<uint64_t> receivedCountTotal(0);
std::atomic<uint64_t> receivedCountPerSecs(0);
std::mutex conditionVariableMutex;
std::condition_variable condition;
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
std::atomic<bool> stop(false);
// Setup a callback to be fired
// when a message or an event (open, close, ping, pong, error) is received
webSocket.setOnMessageCallback(
[&webSocket, &receivedCountPerSecs, &receivedCountTotal, &stop, &condition, &bench](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
webSocket.send(msg->str, msg->binary);
receivedCountPerSecs++;
receivedCountTotal++;
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("{}: {}", it.first, it.second);
bench.reset();
spdlog::info("ws_autoroute: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("ws_autoroute: connection closed");
stop = true;
condition.notify_one();
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
bench.report();
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
auto timer = [&receivedCountTotal, &receivedCountPerSecs, &stop] {
setThreadName("Timer");
while (true)
while (!stop)
{
//
// We cannot write to sentCount and receivedCount
@ -1178,8 +1192,7 @@ namespace ix
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
receivedCountPerSecs = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
@ -1192,17 +1205,17 @@ namespace ix
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
// Wait for all the messages to be received
std::unique_lock<std::mutex> lock(conditionVariableMutex);
condition.wait(lock);
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
t1.join();
webSocket.stop();
webSocket.send(text);
}
std::stringstream ss;
ss << "messages received: " << receivedCountTotal << " total";
CoreLogger::info(ss.str());
return 0;
}
@ -1640,7 +1653,6 @@ namespace ix
}
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
@ -1671,10 +1683,13 @@ namespace ix
server.disablePong();
}
// push one million messages
std::atomic<bool> stop(false);
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
[&sendMsg, &stop](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
@ -1688,20 +1703,30 @@ namespace ix
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
// Parse the msg count from the uri.
int msgCount = -1;
std::stringstream ss;
auto uriSize = msg->openInfo.uri.size();
ss << msg->openInfo.uri.substr(1, uriSize - 1);
ss >> msgCount;
bool binary = false;
while (true)
if (msgCount == -1)
{
auto sendInfo = webSocket.send(sendMsg, binary);
if (!sendInfo.success)
spdlog::info("Error parsing message count, closing connection");
webSocket.close();
}
else
{
bool binary = false;
for (int i = 0; i < msgCount; ++i)
{
spdlog::info("Error sending message, closing connection");
webSocket.close();
break;
auto sendInfo = webSocket.send(sendMsg, binary);
if (!sendInfo.success)
{
spdlog::info("Error sending message, closing connection");
webSocket.close();
break;
}
}
}
}
@ -1711,6 +1736,7 @@ namespace ix
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
stop = true;
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -1734,7 +1760,14 @@ namespace ix
}
server.start();
server.wait();
while (!stop)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
server.stop();
return 0;
}
@ -2848,6 +2881,7 @@ int main(int argc, char** argv)
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
int msgCount = 1000 * 1000;
uint32_t maxWaitBetweenReconnectionRetries;
int pingIntervalSecs = 30;
@ -2941,17 +2975,14 @@ int main(int argc, char** argv)
addGenericOptions(connectApp);
addTLSOptions(connectApp);
CLI::App* echoClientApp =
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance");
echoClientApp->fallthrough();
echoClientApp->add_option("url", url, "Connection url")->required();
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
echoClientApp->add_option(
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent");
addTLSOptions(echoClientApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
@ -2976,7 +3007,6 @@ int main(int argc, char** argv)
pushServerApp->add_option("--port", port, "Port");
pushServerApp->add_option("--host", hostname, "Hostname");
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
pushServerApp->add_flag("-g", greetings, "Greet");
pushServerApp->add_flag("-6", ipv6, "IpV6");
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
@ -3267,16 +3297,10 @@ int main(int argc, char** argv)
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("echo_client"))
else if (app.got_subcommand("autoroute"))
{
ret = ix::ws_echo_client(url,
disablePerMessageDeflate,
binaryMode,
tlsOptions,
subprotocol,
pingIntervalSecs,
sendMsg,
noSend);
ret = ix::ws_autoroute(
url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount);
}
else if (app.got_subcommand("echo_server"))
{
@ -3286,7 +3310,6 @@ int main(int argc, char** argv)
else if (app.got_subcommand("push_server"))
{
ret = ix::ws_push_server(port,
greetings,
hostname,
tlsOptions,
ipv6,