ws transfer + send + receive / improved logging (contributed by Matt DeBoer)
This commit is contained in:
		| @@ -74,7 +74,7 @@ namespace ix | ||||
|  | ||||
|     void WebSocketReceiver::waitForConnection() | ||||
|     { | ||||
|         std::cout << "Connecting..." << std::endl; | ||||
|         std::cout << "ws_receive: Connecting..." << std::endl; | ||||
|  | ||||
|         std::unique_lock<std::mutex> lock(_conditionVariableMutex); | ||||
|         _condition.wait(lock); | ||||
| @@ -82,7 +82,7 @@ namespace ix | ||||
|  | ||||
|     void WebSocketReceiver::waitForMessage() | ||||
|     { | ||||
|         std::cout << "Waiting for message..." << std::endl; | ||||
|         std::cout << "ws_receive: Waiting for message..." << std::endl; | ||||
|  | ||||
|         std::unique_lock<std::mutex> lock(_conditionVariableMutex); | ||||
|         _condition.wait(lock); | ||||
| @@ -118,27 +118,27 @@ namespace ix | ||||
|  | ||||
|     void WebSocketReceiver::handleMessage(const std::string& str) | ||||
|     { | ||||
|         std::cerr << "Received message: " << str.size() << std::endl; | ||||
|         std::cerr << "ws_receive: Received message: " << str.size() << std::endl; | ||||
|  | ||||
|         std::string errMsg; | ||||
|         MsgPack data = MsgPack::parse(str, errMsg); | ||||
|         if (!errMsg.empty()) | ||||
|         { | ||||
|             handleError("Invalid MsgPack", std::string()); | ||||
|             handleError("ws_receive: Invalid MsgPack", std::string()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         std::cout << "id: " << data["id"].string_value() << std::endl; | ||||
|  | ||||
|         std::vector<uint8_t> content = data["content"].binary_items(); | ||||
|         std::cout << "Content size: " << content.size() << std::endl; | ||||
|         std::cout << "ws_receive: Content size: " << content.size() << std::endl; | ||||
|  | ||||
|         // Validate checksum | ||||
|         uint64_t cksum = ix::djb2Hash(content); | ||||
|         auto cksumRef = data["djb2_hash"].string_value(); | ||||
|  | ||||
|         std::cout << "Computed hash: " << cksum << std::endl; | ||||
|         std::cout << "Reference hash: " << cksumRef << std::endl; | ||||
|         std::cout << "ws_receive: Computed hash: " << cksum << std::endl; | ||||
|         std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl; | ||||
|  | ||||
|         if (std::to_string(cksum) != cksumRef) | ||||
|         { | ||||
| @@ -151,12 +151,12 @@ namespace ix | ||||
|  | ||||
|         std::string filenameTmp = filename + ".tmp"; | ||||
|  | ||||
|         std::cout << "Writing to disk: " << filenameTmp << std::endl; | ||||
|         std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl; | ||||
|         std::ofstream out(filenameTmp); | ||||
|         out.write((char*) &content.front(), content.size()); | ||||
|         out.close(); | ||||
|  | ||||
|         std::cout << "Renaming " << filenameTmp << " to " << filename << std::endl; | ||||
|         std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl; | ||||
|         rename(filenameTmp.c_str(), filename.c_str()); | ||||
|  | ||||
|         std::map<MsgPack, MsgPack> pdu; | ||||
| @@ -172,13 +172,12 @@ namespace ix | ||||
|     void WebSocketReceiver::start() | ||||
|     { | ||||
|         _webSocket.setUrl(_url); | ||||
|  | ||||
|         ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( | ||||
|             _enablePerMessageDeflate, false, false, 15, 15); | ||||
|         _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         log(std::string("Connecting to url: ") + _url); | ||||
|         log(std::string("ws_receive: Connecting to url: ") + _url); | ||||
|  | ||||
|         _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { | ||||
|             std::stringstream ss; | ||||
| @@ -231,7 +230,7 @@ namespace ix | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 ss << "Invalid ix::WebSocketMessageType"; | ||||
|                 ss << "ws_receive: Invalid ix::WebSocketMessageType"; | ||||
|                 log(ss.str()); | ||||
|             } | ||||
|         }); | ||||
| @@ -251,7 +250,7 @@ namespace ix | ||||
|         std::chrono::duration<double, std::milli> duration(1000); | ||||
|         std::this_thread::sleep_for(duration); | ||||
|  | ||||
|         std::cout << "Done !" << std::endl; | ||||
|         std::cout << "ws_receive: Done !" << std::endl; | ||||
|         webSocketReceiver.stop(); | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -67,7 +67,7 @@ namespace ix | ||||
|  | ||||
|     void WebSocketSender::waitForConnection() | ||||
|     { | ||||
|         std::cout << "Connecting..." << std::endl; | ||||
|         std::cout << "ws_send: Connecting..." << std::endl; | ||||
|  | ||||
|         std::unique_lock<std::mutex> lock(_conditionVariableMutex); | ||||
|         _condition.wait(lock); | ||||
| @@ -75,7 +75,7 @@ namespace ix | ||||
|  | ||||
|     void WebSocketSender::waitForAck() | ||||
|     { | ||||
|         std::cout << "Waiting for ack..." << std::endl; | ||||
|         std::cout << "ws_send: Waiting for ack..." << std::endl; | ||||
|  | ||||
|         std::unique_lock<std::mutex> lock(_conditionVariableMutex); | ||||
|         _condition.wait(lock); | ||||
| @@ -107,7 +107,7 @@ namespace ix | ||||
|         _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         log(std::string("Connecting to url: ") + _url); | ||||
|         log(std::string("ws_send: Connecting to url: ") + _url); | ||||
|  | ||||
|         _webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { | ||||
|             std::stringstream ss; | ||||
| @@ -162,7 +162,7 @@ namespace ix | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 ss << "Invalid ix::WebSocketMessageType"; | ||||
|                 ss << "ws_send: Invalid ix::WebSocketMessageType"; | ||||
|                 log(ss.str()); | ||||
|             } | ||||
|         }); | ||||
| @@ -258,7 +258,7 @@ namespace ix | ||||
|         auto duration = bench.getDuration(); | ||||
|         auto transferRate = 1000 * content.size() / duration; | ||||
|         transferRate /= (1024 * 1024); | ||||
|         std::cout << "Send transfer rate: " << transferRate << "MB/s" << std::endl; | ||||
|         std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl; | ||||
|     } | ||||
|  | ||||
|     void wsSend(const std::string& url, | ||||
| @@ -271,12 +271,12 @@ namespace ix | ||||
|  | ||||
|         webSocketSender.waitForConnection(); | ||||
|  | ||||
|         std::cout << "Sending..." << std::endl; | ||||
|         std::cout << "ws_send: Sending..." << std::endl; | ||||
|         webSocketSender.sendMessage(path, throttle); | ||||
|  | ||||
|         webSocketSender.waitForAck(); | ||||
|  | ||||
|         std::cout << "Done !" << std::endl; | ||||
|         std::cout << "ws_send: Done !" << std::endl; | ||||
|         webSocketSender.stop(); | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -12,7 +12,7 @@ namespace ix | ||||
| { | ||||
|     int ws_transfer_main(int port, const std::string& hostname) | ||||
|     { | ||||
|         std::cout << "Listening on " << hostname << ":" << port << std::endl; | ||||
|         std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl; | ||||
|  | ||||
|         ix::WebSocketServer server(port, hostname); | ||||
|  | ||||
| @@ -22,7 +22,7 @@ namespace ix | ||||
|                                                 const WebSocketMessagePtr& msg) { | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     std::cerr << "New connection" << std::endl; | ||||
|                     std::cerr << "ws_transfer: New connection" << std::endl; | ||||
|                     std::cerr << "id: " << connectionState->getId() << std::endl; | ||||
|                     std::cerr << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|                     std::cerr << "Headers:" << std::endl; | ||||
| @@ -33,14 +33,16 @@ namespace ix | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     std::cerr << "Closed connection" | ||||
|                               << " code " << msg->closeInfo.code << " reason " | ||||
|                     std::cerr << "ws_transfer: [client " << connectionState->getId() | ||||
|                               << "]: Closed connection, code " << msg->closeInfo.code << " reason " | ||||
|                               << msg->closeInfo.reason << std::endl; | ||||
|                     auto remaining = server.getClients().erase(webSocket); | ||||
|                     std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
|                     std::stringstream ss; | ||||
|                     ss << "Connection error: " << msg->errorInfo.reason << std::endl; | ||||
|                     ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl; | ||||
|                     ss << "#retries: " << msg->errorInfo.retries << std::endl; | ||||
|                     ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; | ||||
|                     ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; | ||||
| @@ -48,32 +50,60 @@ namespace ix | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Fragment) | ||||
|                 { | ||||
|                     std::cerr << "Received message fragment " << std::endl; | ||||
|                     std::cerr << "ws_transfer: Received message fragment " << std::endl; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; | ||||
|                     std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl; | ||||
|                     size_t receivers = 0; | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         { | ||||
|                             client->send(msg->str, msg->binary, [](int current, int total) -> bool { | ||||
|                                 std::cerr << "ws_transfer: Step " << current << " out of " << total | ||||
|                                           << std::endl; | ||||
|                                 return true; | ||||
|                             }); | ||||
|  | ||||
|                             do | ||||
|                             auto readyState = client->getReadyState(); | ||||
|                             if (readyState == ReadyState::Open) | ||||
|                             { | ||||
|                                 size_t bufferedAmount = client->bufferedAmount(); | ||||
|                                 std::cerr << "ws_transfer: " << bufferedAmount | ||||
|                                           << " bytes left to be sent" << std::endl; | ||||
|                                 ++receivers; | ||||
|                                 client->send(msg->str, | ||||
|                                              msg->binary, | ||||
|                                              [id = connectionState->getId()](int current, | ||||
|                                                                              int total) -> bool { | ||||
|                                                  std::cerr << "ws_transfer: [client " << id | ||||
|                                                            << "]: Step " << current << " out of " | ||||
|                                                            << total << std::endl; | ||||
|                                                  return true; | ||||
|                                              }); | ||||
|  | ||||
|                                 std::chrono::duration<double, std::milli> duration(10); | ||||
|                                 std::this_thread::sleep_for(duration); | ||||
|                             } while (client->bufferedAmount() != 0); | ||||
|                                 do | ||||
|                                 { | ||||
|                                     size_t bufferedAmount = client->bufferedAmount(); | ||||
|                                     std::cerr << "ws_transfer: [client " << connectionState->getId() | ||||
|                                               << "]: " << bufferedAmount | ||||
|                                               << " bytes left to be sent, " << std::endl; | ||||
|  | ||||
|                                     std::this_thread::sleep_for(std::chrono::milliseconds(500)); | ||||
|  | ||||
|                                 } while (client->bufferedAmount() != 0 && | ||||
|                                          client->getReadyState() == ReadyState::Open); | ||||
|                             } | ||||
|                             else | ||||
|                             { | ||||
|                                 std::string readyStateString = | ||||
|                                     readyState == ReadyState::Connecting | ||||
|                                         ? "Connecting" | ||||
|                                         : readyState == ReadyState::Closing ? "Closing" : "Closed"; | ||||
|                                 size_t bufferedAmount = client->bufferedAmount(); | ||||
|                                 std::cerr << "ws_transfer: [client " << connectionState->getId() | ||||
|                                           << "]: has readystate '" << readyStateString << "' and " | ||||
|                                           << bufferedAmount << " bytes left to be sent, " | ||||
|                                           << std::endl; | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     if (!receivers) | ||||
|                     { | ||||
|                         std::cerr << "ws_transfer: no remaining receivers" << std::endl; | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user