(ws) autoroute command exit on its own once all messages have been received

This commit is contained in:
Benjamin Sergeant 2020-09-09 18:01:38 -07:00
parent 9f51a54a83
commit a4e5d1b47a
3 changed files with 19 additions and 16 deletions

View File

@ -2,6 +2,11 @@
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [10.3.5] - 2020-09-09
(ws) autoroute command exit on its own once all messages have been received
## [10.3.4] - 2020-09-04 ## [10.3.4] - 2020-09-04
(docker) ws docker file installs strace (docker) ws docker file installs strace

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "10.3.4" #define IX_WEBSOCKET_VERSION "10.3.5"

View File

@ -1134,8 +1134,8 @@ namespace ix
webSocket.addSubProtocol(subprotocol); webSocket.addSubProtocol(subprotocol);
} }
std::atomic<uint64_t> receivedCountTotal(0);
std::atomic<uint64_t> receivedCountPerSecs(0); std::atomic<uint64_t> receivedCountPerSecs(0);
std::atomic<uint64_t> target(msgCount);
std::mutex conditionVariableMutex; std::mutex conditionVariableMutex;
std::condition_variable condition; std::condition_variable condition;
@ -1144,12 +1144,20 @@ namespace ix
// Setup a callback to be fired // Setup a callback to be fired
// when a message or an event (open, close, ping, pong, error) is received // when a message or an event (open, close, ping, pong, error) is received
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[&webSocket, &receivedCountPerSecs, &receivedCountTotal, &stop, &condition, &bench]( [&webSocket, &receivedCountPerSecs, &target, &stop, &condition, &bench](
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message) if (msg->type == ix::WebSocketMessageType::Message)
{ {
receivedCountPerSecs++; receivedCountPerSecs++;
receivedCountTotal++;
target -= 1;
if (target == 0)
{
stop = true;
condition.notify_one();
bench.report();
}
} }
else if (msg->type == ix::WebSocketMessageType::Open) else if (msg->type == ix::WebSocketMessageType::Open)
{ {
@ -1170,14 +1178,10 @@ namespace ix
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
spdlog::info("ws_autoroute: connection closed"); spdlog::info("ws_autoroute: connection closed");
stop = true;
condition.notify_one();
bench.report();
} }
}); });
auto timer = [&receivedCountTotal, &receivedCountPerSecs, &stop] { auto timer = [&receivedCountPerSecs, &stop] {
setThreadName("Timer"); setThreadName("Timer");
while (!stop) while (!stop)
{ {
@ -1187,8 +1191,7 @@ namespace ix
// our own counters // our own counters
// //
std::stringstream ss; std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second " ss << "messages received per second: " << receivedCountPerSecs;
<< receivedCountTotal << " total";
CoreLogger::info(ss.str()); CoreLogger::info(ss.str());
@ -1212,11 +1215,6 @@ namespace ix
t1.join(); t1.join();
webSocket.stop(); webSocket.stop();
std::stringstream ss;
ss << "messages received: " << receivedCountTotal << " total";
CoreLogger::info(ss.str());
return 0; return 0;
} }