when trying to flush the send buffer, use select to wait until it is possible instead of using sleep to retry at a given frequency

This commit is contained in:
Benjamin Sergeant 2019-03-18 14:25:27 -07:00
parent e98634a277
commit d58798e36c
11 changed files with 126 additions and 53 deletions

View File

@ -77,6 +77,14 @@ namespace ix
return false; return false;
} }
//
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
// on reads/writes to a closed fd
//
// The generation of the SIGPIPE signal can be suppressed using the
// F_SETNOSIGPIPE fcntl command.
//
return true; return true;
} }

View File

@ -49,22 +49,27 @@ namespace ix
return; return;
} }
PollResultType pollResult = select(timeoutSecs, 0); PollResultType pollResult = isReadyToRead(timeoutSecs);
if (onPollCallback) onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
PollResultType Socket::select(int timeoutSecs, int timeoutMs) PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
{ {
fd_set rfds; fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds); FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed // File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd(); int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1) if (interruptFd != -1)
{ {
FD_SET(interruptFd, &rfds); FD_SET(interruptFd, fds);
} }
struct timeval timeout; struct timeval timeout;
@ -75,7 +80,7 @@ namespace ix
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd); int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutSecs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType_ReadyForRead;
@ -87,7 +92,7 @@ namespace ix
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType_Timeout;
} }
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds)) else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
{ {
uint64_t value = _selectInterrupt->read(); uint64_t value = _selectInterrupt->read();
@ -100,10 +105,34 @@ namespace ix
pollResult = PollResultType_CloseRequest; pollResult = PollResultType_CloseRequest;
} }
} }
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
{
if (readyToRead)
{
pollResult = PollResultType_ReadyForRead;
}
else
{
pollResult = PollResultType_ReadyForWrite;
}
}
return pollResult; return pollResult;
} }
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
{
bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs);
}
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select // Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode) bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{ {
@ -231,10 +260,9 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// Wait with a timeout until something is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
int res = select(0, 1); if (isReadyToRead(0, 1) == PollResultType_Error)
if (res < 0 && (errno == EBADF || errno == EINVAL))
{ {
return false; return false;
} }
@ -301,9 +329,12 @@ namespace ix
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a timeout until something is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
select(0, 1); if (isReadyToRead(0, 1) == PollResultType_Error)
{
return std::make_pair(false, std::string());
}
} }
return std::make_pair(true, std::string(output.begin(), return std::make_pair(true, std::string(output.begin(),

View File

@ -28,10 +28,11 @@ namespace ix
enum PollResultType enum PollResultType
{ {
PollResultType_ReadyForRead = 0, PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1, PollResultType_ReadyForWrite = 1,
PollResultType_Error = 2, PollResultType_Timeout = 2,
PollResultType_SendRequest = 3, PollResultType_Error = 3,
PollResultType_CloseRequest = 4 PollResultType_SendRequest = 4,
PollResultType_CloseRequest = 5
}; };
class Socket { class Socket {
@ -44,10 +45,14 @@ namespace ix
void configure(); void configure();
PollResultType select(int timeoutSecs, int timeoutMs); // Functions to check whether there is activity on the socket
virtual void poll(const OnPollCallback& onPollCallback, void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
virtual bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs = 0);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs = 0);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,

View File

@ -202,13 +202,20 @@ namespace ix
{ {
while (!isSendBufferEmpty() && !_requestInitCancellation) while (!isSendBufferEmpty() && !_requestInitCancellation)
{ {
sendOnSocket(); // Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
// Sleep 10ms between each send so that we dont busy loop int timeoutMs = 10;
// A better strategy would be to select on the socket to PollResultType result = _socket->isReadyToWrite(0, 10);
// check whether we can write to it without blocking if (result == PollResultType_Error)
std::chrono::duration<double, std::micro> duration(10); {
std::this_thread::sleep_for(duration); _socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType_ReadyForWrite)
{
sendOnSocket();
}
} }
} }
else if (pollResult == PollResultType_ReadyForRead) else if (pollResult == PollResultType_ReadyForRead)

View File

@ -36,7 +36,7 @@ test_server:
test: test:
python test/run.py python test/run.py
ws_test: ws_test: all
(cd ws ; sh test_ws.sh) (cd ws ; sh test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor

View File

@ -21,20 +21,19 @@ done
# Start a receiver # Start a receiver
mkdir -p /tmp/ws_test/receive mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive cd /tmp/ws_test/receive
ws receive ws://127.0.0.1:8090 & ws receive --delay 5 ws://127.0.0.1:8090 &
mkdir /tmp/ws_test/send mkdir /tmp/ws_test/send
cd /tmp/ws_test/send cd /tmp/ws_test/send
# mkfile 10m 10M_file dd if=/dev/urandom of=20M_file count=20000 bs=1024
dd if=/dev/urandom of=10M_file count=10000 bs=1024
# Start the sender job # Start the sender job
ws send ws://127.0.0.1:8090 10M_file ws send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk # Wait until the file has been written to disk
while true while true
do do
if test -f /tmp/ws_test/receive/10M_file ; then if test -f /tmp/ws_test/receive/20M_file ; then
echo "Received file does exists, exiting loop" echo "Received file does exists, exiting loop"
break break
fi fi
@ -42,8 +41,8 @@ do
sleep 0.1 sleep 0.1
done done
cksum /tmp/ws_test/send/10M_file cksum /tmp/ws_test/send/20M_file
cksum /tmp/ws_test/receive/10M_file cksum /tmp/ws_test/receive/20M_file
# Give some time to ws receive to terminate # Give some time to ws receive to terminate
sleep 2 sleep 2

View File

@ -44,13 +44,17 @@ int main(int argc, char** argv)
int connectTimeOut = 60; int connectTimeOut = 60;
int transferTimeout = 1800; int transferTimeout = 1800;
int maxRedirects = 5; int maxRedirects = 5;
int delayMs = -1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file"); CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send") sendApp->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath); ->required()->check(CLI::ExistingPath);
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file"); CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required(); receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
@ -114,7 +118,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("receive")) else if (app.got_subcommand("receive"))
{ {
bool enablePerMessageDeflate = false; bool enablePerMessageDeflate = false;
return ix::ws_receive_main(url, enablePerMessageDeflate); return ix::ws_receive_main(url, enablePerMessageDeflate, delayMs);
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {

View File

@ -34,7 +34,8 @@ namespace ix
int ws_connect_main(const std::string& url); int ws_connect_main(const std::string& url);
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate); bool enablePerMessageDeflate,
int delayMs);
int ws_send_main(const std::string& url, int ws_send_main(const std::string& url,
const std::string& path); const std::string& path);

View File

@ -26,7 +26,8 @@ namespace ix
{ {
public: public:
WebSocketReceiver(const std::string& _url, WebSocketReceiver(const std::string& _url,
bool enablePerMessageDeflate); bool enablePerMessageDeflate,
int delayMs);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -41,6 +42,8 @@ namespace ix
std::string _id; std::string _id;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _enablePerMessageDeflate; bool _enablePerMessageDeflate;
int _delayMs;
int _receivedFragmentCounter;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
std::condition_variable _condition; std::condition_variable _condition;
@ -51,9 +54,12 @@ namespace ix
}; };
WebSocketReceiver::WebSocketReceiver(const std::string& url, WebSocketReceiver::WebSocketReceiver(const std::string& url,
bool enablePerMessageDeflate) : bool enablePerMessageDeflate,
int delayMs) :
_url(url), _url(url),
_enablePerMessageDeflate(enablePerMessageDeflate) _enablePerMessageDeflate(enablePerMessageDeflate),
_delayMs(delayMs),
_receivedFragmentCounter(0)
{ {
; ;
} }
@ -213,8 +219,15 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Fragment) else if (messageType == ix::WebSocket_MessageType_Fragment)
{ {
ss << "ws_receive: received fragment"; ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str()); log(ss.str());
if (_delayMs > 0)
{
// Introduce an arbitrary delay, to simulate a slow connection
std::chrono::duration<double, std::milli> duration(_delayMs);
std::this_thread::sleep_for(duration);
}
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
@ -235,9 +248,10 @@ namespace ix
} }
void wsReceive(const std::string& url, void wsReceive(const std::string& url,
bool enablePerMessageDeflate) bool enablePerMessageDeflate,
int delayMs)
{ {
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate); WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs);
webSocketReceiver.start(); webSocketReceiver.start();
webSocketReceiver.waitForConnection(); webSocketReceiver.waitForConnection();
@ -252,9 +266,10 @@ namespace ix
} }
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate) bool enablePerMessageDeflate,
int delayMs)
{ {
wsReceive(url, enablePerMessageDeflate); wsReceive(url, enablePerMessageDeflate, delayMs);
return 0; return 0;
} }
} }

View File

@ -246,7 +246,7 @@ namespace ix
_webSocket.send(msg.dump(), _webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "Step " << current << " out of " << total << std::endl; std::cout << "ws_send: Step " << current << " out of " << total << std::endl;
if (throttle) if (throttle)
{ {
@ -260,7 +260,8 @@ namespace ix
do do
{ {
size_t bufferedAmount = _webSocket.bufferedAmount(); size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << bufferedAmount << " bytes left to be sent" << std::endl; std::cout << "ws_send: " << bufferedAmount
<< " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);

View File

@ -54,7 +54,8 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Fragment) else if (messageType == ix::WebSocket_MessageType_Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment "
<< std::endl;
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
@ -66,7 +67,7 @@ namespace ix
client->send(str, client->send(str,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "Step " << current std::cerr << "ws_transfer: Step " << current
<< " out of " << total << std::endl; << " out of " << total << std::endl;
return true; return true;
}); });
@ -74,7 +75,8 @@ namespace ix
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl; std::cerr << "ws_transfer: " << bufferedAmount
<< " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);