diff --git a/ixwebsocket/IXSelectInterruptPipe.cpp b/ixwebsocket/IXSelectInterruptPipe.cpp index 952248db..3d6aa7a9 100644 --- a/ixwebsocket/IXSelectInterruptPipe.cpp +++ b/ixwebsocket/IXSelectInterruptPipe.cpp @@ -77,6 +77,14 @@ namespace ix return false; } + // + // FIXME: on macOS we should configure the pipe to not trigger SIGPIPE + // on reads/writes to a closed fd + // + // The generation of the SIGPIPE signal can be suppressed using the + // F_SETNOSIGPIPE fcntl command. + // + return true; } diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index 7e383158..2329d67a 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -49,22 +49,27 @@ namespace ix return; } - PollResultType pollResult = select(timeoutSecs, 0); + PollResultType pollResult = isReadyToRead(timeoutSecs); if (onPollCallback) onPollCallback(pollResult); } - PollResultType Socket::select(int timeoutSecs, int timeoutMs) + PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs) { fd_set rfds; + fd_set wfds; FD_ZERO(&rfds); - FD_SET(_sockfd, &rfds); + FD_ZERO(&wfds); + + fd_set* fds = (readyToRead) ? &rfds : & wfds; + + FD_SET(_sockfd, fds); // File descriptor used to interrupt select when needed int interruptFd = _selectInterrupt->getFd(); if (interruptFd != -1) { - FD_SET(interruptFd, &rfds); + FD_SET(interruptFd, fds); } struct timeval timeout; @@ -75,7 +80,7 @@ namespace ix int sockfd = _sockfd; int nfds = (std::max)(sockfd, interruptFd); - int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, + int ret = ::select(nfds + 1, &rfds, &wfds, nullptr, (timeoutSecs < 0) ? nullptr : &timeout); PollResultType pollResult = PollResultType_ReadyForRead; @@ -87,7 +92,7 @@ namespace ix { pollResult = PollResultType_Timeout; } - else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds)) + else if (interruptFd != -1 && FD_ISSET(interruptFd, fds)) { uint64_t value = _selectInterrupt->read(); @@ -100,10 +105,34 @@ namespace ix pollResult = PollResultType_CloseRequest; } } + else if (sockfd != -1 && FD_ISSET(sockfd, fds)) + { + if (readyToRead) + { + pollResult = PollResultType_ReadyForRead; + } + else + { + pollResult = PollResultType_ReadyForWrite; + } + } + return pollResult; } + PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs) + { + bool readyToRead = true; + return select(readyToRead, timeoutSecs, timeoutMs); + } + + PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs) + { + bool readyToRead = false; + return select(readyToRead, timeoutSecs, timeoutMs); + } + // Wake up from poll/select by writing to the pipe which is watched by select bool Socket::wakeUpFromPoll(uint8_t wakeUpCode) { @@ -231,10 +260,9 @@ namespace ix else if (ret < 0 && (getErrno() == EWOULDBLOCK || getErrno() == EAGAIN)) { - // Wait with a timeout until something is ready to read. + // Wait with a 1ms timeout until the socket is ready to read. // This way we are not busy looping - int res = select(0, 1); - if (res < 0 && (errno == EBADF || errno == EINVAL)) + if (isReadyToRead(0, 1) == PollResultType_Error) { return false; } @@ -301,9 +329,12 @@ namespace ix if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); - // Wait with a timeout until something is ready to read. + // Wait with a 1ms timeout until the socket is ready to read. // This way we are not busy looping - select(0, 1); + if (isReadyToRead(0, 1) == PollResultType_Error) + { + return std::make_pair(false, std::string()); + } } return std::make_pair(true, std::string(output.begin(), diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 68a5ac42..01c486b3 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -28,10 +28,11 @@ namespace ix enum PollResultType { PollResultType_ReadyForRead = 0, - PollResultType_Timeout = 1, - PollResultType_Error = 2, - PollResultType_SendRequest = 3, - PollResultType_CloseRequest = 4 + PollResultType_ReadyForWrite = 1, + PollResultType_Timeout = 2, + PollResultType_Error = 3, + PollResultType_SendRequest = 4, + PollResultType_CloseRequest = 5 }; class Socket { @@ -44,10 +45,14 @@ namespace ix void configure(); - PollResultType select(int timeoutSecs, int timeoutMs); - virtual void poll(const OnPollCallback& onPollCallback, - int timeoutSecs = kDefaultPollTimeout); - virtual bool wakeUpFromPoll(uint8_t wakeUpCode); + // Functions to check whether there is activity on the socket + void poll(const OnPollCallback& onPollCallback, + int timeoutSecs = kDefaultPollTimeout); + bool wakeUpFromPoll(uint8_t wakeUpCode); + + PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs); + PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs = 0); + PollResultType isReadyToRead(int timeoutSecs, int timeoutMs = 0); // Virtual methods virtual bool connect(const std::string& url, diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index a1f994df..4d5e7ab9 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -1,18 +1,18 @@ /* * The MIT License (MIT) - * + * * Copyright (c) 2012, 2013 - * + * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: - * + * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. - * + * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -202,13 +202,20 @@ namespace ix { while (!isSendBufferEmpty() && !_requestInitCancellation) { - sendOnSocket(); - - // Sleep 10ms between each send so that we dont busy loop - // A better strategy would be to select on the socket to - // check whether we can write to it without blocking - std::chrono::duration duration(10); - std::this_thread::sleep_for(duration); + // Wait with a 10ms timeout until the socket is ready to write. + // This way we are not busy looping + int timeoutMs = 10; + PollResultType result = _socket->isReadyToWrite(0, 10); + if (result == PollResultType_Error) + { + _socket->close(); + setReadyState(CLOSED); + break; + } + else if (result == PollResultType_ReadyForWrite) + { + sendOnSocket(); + } } } else if (pollResult == PollResultType_ReadyForRead) diff --git a/makefile b/makefile index 1853a600..76c669ad 100644 --- a/makefile +++ b/makefile @@ -36,7 +36,7 @@ test_server: test: python test/run.py -ws_test: +ws_test: all (cd ws ; sh test_ws.sh) # For the fork that is configured with appveyor diff --git a/ws/test_ws.sh b/ws/test_ws.sh index 8c8296f0..75305520 100644 --- a/ws/test_ws.sh +++ b/ws/test_ws.sh @@ -21,20 +21,19 @@ done # Start a receiver mkdir -p /tmp/ws_test/receive cd /tmp/ws_test/receive -ws receive ws://127.0.0.1:8090 & +ws receive --delay 5 ws://127.0.0.1:8090 & mkdir /tmp/ws_test/send cd /tmp/ws_test/send -# mkfile 10m 10M_file -dd if=/dev/urandom of=10M_file count=10000 bs=1024 +dd if=/dev/urandom of=20M_file count=20000 bs=1024 # Start the sender job -ws send ws://127.0.0.1:8090 10M_file +ws send ws://127.0.0.1:8090 20M_file # Wait until the file has been written to disk while true do - if test -f /tmp/ws_test/receive/10M_file ; then + if test -f /tmp/ws_test/receive/20M_file ; then echo "Received file does exists, exiting loop" break fi @@ -42,8 +41,8 @@ do sleep 0.1 done -cksum /tmp/ws_test/send/10M_file -cksum /tmp/ws_test/receive/10M_file +cksum /tmp/ws_test/send/20M_file +cksum /tmp/ws_test/receive/20M_file # Give some time to ws receive to terminate sleep 2 diff --git a/ws/ws.cpp b/ws/ws.cpp index eff1c115..c6b929bc 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -44,13 +44,17 @@ int main(int argc, char** argv) int connectTimeOut = 60; int transferTimeout = 1800; int maxRedirects = 5; + int delayMs = -1; CLI::App* sendApp = app.add_subcommand("send", "Send a file"); sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("path", path, "Path to the file to send") ->required()->check(CLI::ExistingPath); + CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file"); receiveApp->add_option("url", url, "Connection url")->required(); + receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment" + " to artificially slow down the receiver"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); transferApp->add_option("--port", port, "Connection url"); @@ -114,7 +118,7 @@ int main(int argc, char** argv) else if (app.got_subcommand("receive")) { bool enablePerMessageDeflate = false; - return ix::ws_receive_main(url, enablePerMessageDeflate); + return ix::ws_receive_main(url, enablePerMessageDeflate, delayMs); } else if (app.got_subcommand("connect")) { diff --git a/ws/ws.h b/ws/ws.h index 5730c262..45cba245 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -34,7 +34,8 @@ namespace ix int ws_connect_main(const std::string& url); int ws_receive_main(const std::string& url, - bool enablePerMessageDeflate); + bool enablePerMessageDeflate, + int delayMs); int ws_send_main(const std::string& url, const std::string& path); diff --git a/ws/ws_receive.cpp b/ws/ws_receive.cpp index 15d38ee0..e46ea14b 100644 --- a/ws/ws_receive.cpp +++ b/ws/ws_receive.cpp @@ -26,7 +26,8 @@ namespace ix { public: WebSocketReceiver(const std::string& _url, - bool enablePerMessageDeflate); + bool enablePerMessageDeflate, + int delayMs); void subscribe(const std::string& channel); void start(); @@ -41,6 +42,8 @@ namespace ix std::string _id; ix::WebSocket _webSocket; bool _enablePerMessageDeflate; + int _delayMs; + int _receivedFragmentCounter; std::mutex _conditionVariableMutex; std::condition_variable _condition; @@ -51,9 +54,12 @@ namespace ix }; WebSocketReceiver::WebSocketReceiver(const std::string& url, - bool enablePerMessageDeflate) : + bool enablePerMessageDeflate, + int delayMs) : _url(url), - _enablePerMessageDeflate(enablePerMessageDeflate) + _enablePerMessageDeflate(enablePerMessageDeflate), + _delayMs(delayMs), + _receivedFragmentCounter(0) { ; } @@ -213,8 +219,15 @@ namespace ix } else if (messageType == ix::WebSocket_MessageType_Fragment) { - ss << "ws_receive: received fragment"; + ss << "ws_receive: received fragment " << _receivedFragmentCounter++; log(ss.str()); + + if (_delayMs > 0) + { + // Introduce an arbitrary delay, to simulate a slow connection + std::chrono::duration duration(_delayMs); + std::this_thread::sleep_for(duration); + } } else if (messageType == ix::WebSocket_MessageType_Error) { @@ -235,9 +248,10 @@ namespace ix } void wsReceive(const std::string& url, - bool enablePerMessageDeflate) + bool enablePerMessageDeflate, + int delayMs) { - WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate); + WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs); webSocketReceiver.start(); webSocketReceiver.waitForConnection(); @@ -252,9 +266,10 @@ namespace ix } int ws_receive_main(const std::string& url, - bool enablePerMessageDeflate) + bool enablePerMessageDeflate, + int delayMs) { - wsReceive(url, enablePerMessageDeflate); + wsReceive(url, enablePerMessageDeflate, delayMs); return 0; } } diff --git a/ws/ws_send.cpp b/ws/ws_send.cpp index 353a55f9..ed6911e6 100644 --- a/ws/ws_send.cpp +++ b/ws/ws_send.cpp @@ -246,7 +246,7 @@ namespace ix _webSocket.send(msg.dump(), [throttle](int current, int total) -> bool { - std::cout << "Step " << current << " out of " << total << std::endl; + std::cout << "ws_send: Step " << current << " out of " << total << std::endl; if (throttle) { @@ -260,7 +260,8 @@ namespace ix do { size_t bufferedAmount = _webSocket.bufferedAmount(); - std::cout << bufferedAmount << " bytes left to be sent" << std::endl; + std::cout << "ws_send: " << bufferedAmount + << " bytes left to be sent" << std::endl; std::chrono::duration duration(10); std::this_thread::sleep_for(duration); diff --git a/ws/ws_transfer.cpp b/ws/ws_transfer.cpp index 460f5262..e4564d39 100644 --- a/ws/ws_transfer.cpp +++ b/ws/ws_transfer.cpp @@ -54,7 +54,8 @@ namespace ix } else if (messageType == ix::WebSocket_MessageType_Fragment) { - std::cerr << "Received message fragment" << std::endl; + std::cerr << "Received message fragment " + << std::endl; } else if (messageType == ix::WebSocket_MessageType_Message) { @@ -66,7 +67,7 @@ namespace ix client->send(str, [](int current, int total) -> bool { - std::cerr << "Step " << current + std::cerr << "ws_transfer: Step " << current << " out of " << total << std::endl; return true; }); @@ -74,7 +75,8 @@ namespace ix do { size_t bufferedAmount = client->bufferedAmount(); - std::cerr << bufferedAmount << " bytes left to be sent" << std::endl; + std::cerr << "ws_transfer: " << bufferedAmount + << " bytes left to be sent" << std::endl; std::chrono::duration duration(10); std::this_thread::sleep_for(duration);