ws send / check that we are connected when waiting for the send buffer to be flushed, and if so, abort
This commit is contained in:
parent
61bcc9d27d
commit
9e24475fd6
@ -45,6 +45,8 @@ namespace ix
|
|||||||
ix::WebSocket _webSocket;
|
ix::WebSocket _webSocket;
|
||||||
bool _enablePerMessageDeflate;
|
bool _enablePerMessageDeflate;
|
||||||
|
|
||||||
|
std::atomic<bool> _connected;
|
||||||
|
|
||||||
std::mutex _conditionVariableMutex;
|
std::mutex _conditionVariableMutex;
|
||||||
std::condition_variable _condition;
|
std::condition_variable _condition;
|
||||||
|
|
||||||
@ -56,6 +58,7 @@ namespace ix
|
|||||||
const ix::SocketTLSOptions& tlsOptions)
|
const ix::SocketTLSOptions& tlsOptions)
|
||||||
: _url(url)
|
: _url(url)
|
||||||
, _enablePerMessageDeflate(enablePerMessageDeflate)
|
, _enablePerMessageDeflate(enablePerMessageDeflate)
|
||||||
|
, _connected(false)
|
||||||
{
|
{
|
||||||
_webSocket.disableAutomaticReconnection();
|
_webSocket.disableAutomaticReconnection();
|
||||||
_webSocket.setTLSOptions(tlsOptions);
|
_webSocket.setTLSOptions(tlsOptions);
|
||||||
@ -119,6 +122,8 @@ namespace ix
|
|||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
|
_connected = true;
|
||||||
|
|
||||||
_condition.notify_one();
|
_condition.notify_one();
|
||||||
|
|
||||||
log("ws_send: connected");
|
log("ws_send: connected");
|
||||||
@ -131,6 +136,8 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||||
{
|
{
|
||||||
|
_connected = false;
|
||||||
|
|
||||||
ss << "ws_send: connection closed:";
|
ss << "ws_send: connection closed:";
|
||||||
ss << " code " << msg->closeInfo.code;
|
ss << " code " << msg->closeInfo.code;
|
||||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||||
@ -252,7 +259,7 @@ namespace ix
|
|||||||
|
|
||||||
Bench bench("Sending file through websocket");
|
Bench bench("Sending file through websocket");
|
||||||
auto result = _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
|
auto result = _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
|
||||||
spdlog::info("ws_send: Step {} out of {}", current, total);
|
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
|
||||||
|
|
||||||
if (throttle)
|
if (throttle)
|
||||||
{
|
{
|
||||||
@ -276,15 +283,22 @@ namespace ix
|
|||||||
|
|
||||||
std::chrono::duration<double, std::milli> duration(10);
|
std::chrono::duration<double, std::milli> duration(10);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
} while (_webSocket.bufferedAmount() != 0);
|
} while (_webSocket.bufferedAmount() != 0 && _connected);
|
||||||
|
|
||||||
|
if (_connected)
|
||||||
|
{
|
||||||
bench.report();
|
bench.report();
|
||||||
auto duration = bench.getDuration();
|
auto duration = bench.getDuration();
|
||||||
auto transferRate = 1000 * content.size() / duration;
|
auto transferRate = 1000 * content.size() / duration;
|
||||||
transferRate /= (1024 * 1024);
|
transferRate /= (1024 * 1024);
|
||||||
spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
|
spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
spdlog::error("ws_send: Got disconnected from the server");
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return _connected;
|
||||||
}
|
}
|
||||||
|
|
||||||
void wsSend(const std::string& url,
|
void wsSend(const std::string& url,
|
||||||
@ -304,6 +318,10 @@ namespace ix
|
|||||||
webSocketSender.waitForAck();
|
webSocketSender.waitForAck();
|
||||||
spdlog::info("ws_send: Done !");
|
spdlog::info("ws_send: Done !");
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
spdlog::error("ws_send: Error sending file.");
|
||||||
|
}
|
||||||
|
|
||||||
webSocketSender.stop();
|
webSocketSender.stop();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user