ws send / check that we are connected when waiting for the send buffer to be flushed, and if so, abort
This commit is contained in:
		| @@ -45,6 +45,8 @@ namespace ix | |||||||
|         ix::WebSocket _webSocket; |         ix::WebSocket _webSocket; | ||||||
|         bool _enablePerMessageDeflate; |         bool _enablePerMessageDeflate; | ||||||
|  |  | ||||||
|  |         std::atomic<bool> _connected; | ||||||
|  |  | ||||||
|         std::mutex _conditionVariableMutex; |         std::mutex _conditionVariableMutex; | ||||||
|         std::condition_variable _condition; |         std::condition_variable _condition; | ||||||
|  |  | ||||||
| @@ -56,6 +58,7 @@ namespace ix | |||||||
|                                      const ix::SocketTLSOptions& tlsOptions) |                                      const ix::SocketTLSOptions& tlsOptions) | ||||||
|         : _url(url) |         : _url(url) | ||||||
|         , _enablePerMessageDeflate(enablePerMessageDeflate) |         , _enablePerMessageDeflate(enablePerMessageDeflate) | ||||||
|  |         , _connected(false) | ||||||
|     { |     { | ||||||
|         _webSocket.disableAutomaticReconnection(); |         _webSocket.disableAutomaticReconnection(); | ||||||
|         _webSocket.setTLSOptions(tlsOptions); |         _webSocket.setTLSOptions(tlsOptions); | ||||||
| @@ -119,6 +122,8 @@ namespace ix | |||||||
|             std::stringstream ss; |             std::stringstream ss; | ||||||
|             if (msg->type == ix::WebSocketMessageType::Open) |             if (msg->type == ix::WebSocketMessageType::Open) | ||||||
|             { |             { | ||||||
|  |                 _connected = true; | ||||||
|  |  | ||||||
|                 _condition.notify_one(); |                 _condition.notify_one(); | ||||||
|  |  | ||||||
|                 log("ws_send: connected"); |                 log("ws_send: connected"); | ||||||
| @@ -131,6 +136,8 @@ namespace ix | |||||||
|             } |             } | ||||||
|             else if (msg->type == ix::WebSocketMessageType::Close) |             else if (msg->type == ix::WebSocketMessageType::Close) | ||||||
|             { |             { | ||||||
|  |                 _connected = false; | ||||||
|  |  | ||||||
|                 ss << "ws_send: connection closed:"; |                 ss << "ws_send: connection closed:"; | ||||||
|                 ss << " code " << msg->closeInfo.code; |                 ss << " code " << msg->closeInfo.code; | ||||||
|                 ss << " reason " << msg->closeInfo.reason << std::endl; |                 ss << " reason " << msg->closeInfo.reason << std::endl; | ||||||
| @@ -252,7 +259,7 @@ namespace ix | |||||||
|  |  | ||||||
|         Bench bench("Sending file through websocket"); |         Bench bench("Sending file through websocket"); | ||||||
|         auto result = _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool { |         auto result = _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool { | ||||||
|             spdlog::info("ws_send: Step {} out of {}", current, total); |             spdlog::info("ws_send: Step {} out of {}", current + 1, total); | ||||||
|  |  | ||||||
|             if (throttle) |             if (throttle) | ||||||
|             { |             { | ||||||
| @@ -276,15 +283,22 @@ namespace ix | |||||||
|  |  | ||||||
|             std::chrono::duration<double, std::milli> duration(10); |             std::chrono::duration<double, std::milli> duration(10); | ||||||
|             std::this_thread::sleep_for(duration); |             std::this_thread::sleep_for(duration); | ||||||
|         } while (_webSocket.bufferedAmount() != 0); |         } while (_webSocket.bufferedAmount() != 0 && _connected); | ||||||
|  |  | ||||||
|         bench.report(); |         if (_connected) | ||||||
|         auto duration = bench.getDuration(); |         { | ||||||
|         auto transferRate = 1000 * content.size() / duration; |             bench.report(); | ||||||
|         transferRate /= (1024 * 1024); |             auto duration = bench.getDuration(); | ||||||
|         spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate); |             auto transferRate = 1000 * content.size() / duration; | ||||||
|  |             transferRate /= (1024 * 1024); | ||||||
|  |             spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate); | ||||||
|  |         } | ||||||
|  |         else | ||||||
|  |         { | ||||||
|  |             spdlog::error("ws_send: Got disconnected from the server"); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         return true; |         return _connected; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     void wsSend(const std::string& url, |     void wsSend(const std::string& url, | ||||||
| @@ -304,6 +318,10 @@ namespace ix | |||||||
|             webSocketSender.waitForAck(); |             webSocketSender.waitForAck(); | ||||||
|             spdlog::info("ws_send: Done !"); |             spdlog::info("ws_send: Done !"); | ||||||
|         } |         } | ||||||
|  |         else | ||||||
|  |         { | ||||||
|  |             spdlog::error("ws_send: Error sending file."); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         webSocketSender.stop(); |         webSocketSender.stop(); | ||||||
|     } |     } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user