From 816c53e3a3ce8f721f5b5ffe051dd8642666cb98 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sun, 29 Sep 2019 17:21:52 -0700 Subject: [PATCH] ws transfer + send + receive / improved logging (contributed by Matt DeBoer) --- ws/ws_receive.cpp | 25 ++++++++--------- ws/ws_send.cpp | 14 +++++----- ws/ws_transfer.cpp | 70 +++++++++++++++++++++++++++++++++------------- 3 files changed, 69 insertions(+), 40 deletions(-) diff --git a/ws/ws_receive.cpp b/ws/ws_receive.cpp index ebb62a0a..e84e4b31 100644 --- a/ws/ws_receive.cpp +++ b/ws/ws_receive.cpp @@ -74,7 +74,7 @@ namespace ix void WebSocketReceiver::waitForConnection() { - std::cout << "Connecting..." << std::endl; + std::cout << "ws_receive: Connecting..." << std::endl; std::unique_lock lock(_conditionVariableMutex); _condition.wait(lock); @@ -82,7 +82,7 @@ namespace ix void WebSocketReceiver::waitForMessage() { - std::cout << "Waiting for message..." << std::endl; + std::cout << "ws_receive: Waiting for message..." << std::endl; std::unique_lock lock(_conditionVariableMutex); _condition.wait(lock); @@ -118,27 +118,27 @@ namespace ix void WebSocketReceiver::handleMessage(const std::string& str) { - std::cerr << "Received message: " << str.size() << std::endl; + std::cerr << "ws_receive: Received message: " << str.size() << std::endl; std::string errMsg; MsgPack data = MsgPack::parse(str, errMsg); if (!errMsg.empty()) { - handleError("Invalid MsgPack", std::string()); + handleError("ws_receive: Invalid MsgPack", std::string()); return; } std::cout << "id: " << data["id"].string_value() << std::endl; std::vector content = data["content"].binary_items(); - std::cout << "Content size: " << content.size() << std::endl; + std::cout << "ws_receive: Content size: " << content.size() << std::endl; // Validate checksum uint64_t cksum = ix::djb2Hash(content); auto cksumRef = data["djb2_hash"].string_value(); - std::cout << "Computed hash: " << cksum << std::endl; - std::cout << "Reference hash: " << cksumRef << std::endl; + std::cout << "ws_receive: Computed hash: " << cksum << std::endl; + std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl; if (std::to_string(cksum) != cksumRef) { @@ -151,12 +151,12 @@ namespace ix std::string filenameTmp = filename + ".tmp"; - std::cout << "Writing to disk: " << filenameTmp << std::endl; + std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl; std::ofstream out(filenameTmp); out.write((char*) &content.front(), content.size()); out.close(); - std::cout << "Renaming " << filenameTmp << " to " << filename << std::endl; + std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl; rename(filenameTmp.c_str(), filename.c_str()); std::map pdu; @@ -172,13 +172,12 @@ namespace ix void WebSocketReceiver::start() { _webSocket.setUrl(_url); - ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( _enablePerMessageDeflate, false, false, 15, 15); _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); std::stringstream ss; - log(std::string("Connecting to url: ") + _url); + log(std::string("ws_receive: Connecting to url: ") + _url); _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; @@ -231,7 +230,7 @@ namespace ix } else { - ss << "Invalid ix::WebSocketMessageType"; + ss << "ws_receive: Invalid ix::WebSocketMessageType"; log(ss.str()); } }); @@ -251,7 +250,7 @@ namespace ix std::chrono::duration duration(1000); std::this_thread::sleep_for(duration); - std::cout << "Done !" << std::endl; + std::cout << "ws_receive: Done !" << std::endl; webSocketReceiver.stop(); } diff --git a/ws/ws_send.cpp b/ws/ws_send.cpp index 8f0272fb..9cf9c2ca 100644 --- a/ws/ws_send.cpp +++ b/ws/ws_send.cpp @@ -67,7 +67,7 @@ namespace ix void WebSocketSender::waitForConnection() { - std::cout << "Connecting..." << std::endl; + std::cout << "ws_send: Connecting..." << std::endl; std::unique_lock lock(_conditionVariableMutex); _condition.wait(lock); @@ -75,7 +75,7 @@ namespace ix void WebSocketSender::waitForAck() { - std::cout << "Waiting for ack..." << std::endl; + std::cout << "ws_send: Waiting for ack..." << std::endl; std::unique_lock lock(_conditionVariableMutex); _condition.wait(lock); @@ -107,7 +107,7 @@ namespace ix _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); std::stringstream ss; - log(std::string("Connecting to url: ") + _url); + log(std::string("ws_send: Connecting to url: ") + _url); _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { std::stringstream ss; @@ -162,7 +162,7 @@ namespace ix } else { - ss << "Invalid ix::WebSocketMessageType"; + ss << "ws_send: Invalid ix::WebSocketMessageType"; log(ss.str()); } }); @@ -258,7 +258,7 @@ namespace ix auto duration = bench.getDuration(); auto transferRate = 1000 * content.size() / duration; transferRate /= (1024 * 1024); - std::cout << "Send transfer rate: " << transferRate << "MB/s" << std::endl; + std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl; } void wsSend(const std::string& url, @@ -271,12 +271,12 @@ namespace ix webSocketSender.waitForConnection(); - std::cout << "Sending..." << std::endl; + std::cout << "ws_send: Sending..." << std::endl; webSocketSender.sendMessage(path, throttle); webSocketSender.waitForAck(); - std::cout << "Done !" << std::endl; + std::cout << "ws_send: Done !" << std::endl; webSocketSender.stop(); } diff --git a/ws/ws_transfer.cpp b/ws/ws_transfer.cpp index 7d669178..0f154f84 100644 --- a/ws/ws_transfer.cpp +++ b/ws/ws_transfer.cpp @@ -12,7 +12,7 @@ namespace ix { int ws_transfer_main(int port, const std::string& hostname) { - std::cout << "Listening on " << hostname << ":" << port << std::endl; + std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl; ix::WebSocketServer server(port, hostname); @@ -22,7 +22,7 @@ namespace ix const WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { - std::cerr << "New connection" << std::endl; + std::cerr << "ws_transfer: New connection" << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Headers:" << std::endl; @@ -33,14 +33,16 @@ namespace ix } else if (msg->type == ix::WebSocketMessageType::Close) { - std::cerr << "Closed connection" - << " code " << msg->closeInfo.code << " reason " + std::cerr << "ws_transfer: [client " << connectionState->getId() + << "]: Closed connection, code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason << std::endl; + auto remaining = server.getClients().erase(webSocket); + std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl; } else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; @@ -48,32 +50,60 @@ namespace ix } else if (msg->type == ix::WebSocketMessageType::Fragment) { - std::cerr << "Received message fragment " << std::endl; + std::cerr << "ws_transfer: Received message fragment " << std::endl; } else if (msg->type == ix::WebSocketMessageType::Message) { - std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; + std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl; + size_t receivers = 0; for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(msg->str, msg->binary, [](int current, int total) -> bool { - std::cerr << "ws_transfer: Step " << current << " out of " << total - << std::endl; - return true; - }); - - do + auto readyState = client->getReadyState(); + if (readyState == ReadyState::Open) { - size_t bufferedAmount = client->bufferedAmount(); - std::cerr << "ws_transfer: " << bufferedAmount - << " bytes left to be sent" << std::endl; + ++receivers; + client->send(msg->str, + msg->binary, + [id = connectionState->getId()](int current, + int total) -> bool { + std::cerr << "ws_transfer: [client " << id + << "]: Step " << current << " out of " + << total << std::endl; + return true; + }); - std::chrono::duration duration(10); - std::this_thread::sleep_for(duration); - } while (client->bufferedAmount() != 0); + do + { + size_t bufferedAmount = client->bufferedAmount(); + std::cerr << "ws_transfer: [client " << connectionState->getId() + << "]: " << bufferedAmount + << " bytes left to be sent, " << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + } while (client->bufferedAmount() != 0 && + client->getReadyState() == ReadyState::Open); + } + else + { + std::string readyStateString = + readyState == ReadyState::Connecting + ? "Connecting" + : readyState == ReadyState::Closing ? "Closing" : "Closed"; + size_t bufferedAmount = client->bufferedAmount(); + std::cerr << "ws_transfer: [client " << connectionState->getId() + << "]: has readystate '" << readyStateString << "' and " + << bufferedAmount << " bytes left to be sent, " + << std::endl; + } } } + if (!receivers) + { + std::cerr << "ws_transfer: no remaining receivers" << std::endl; + } } }); });