Compare commits

...

6 Commits

Author SHA1 Message Date
Benjamin Sergeant
ceba8ae620 fix bug with isReadyToWrite 2019-03-18 22:05:04 -07:00
Benjamin Sergeant
fead661ab7 workaround bug in Socket::isReadyToWrite 2019-03-18 20:37:33 -07:00
Benjamin Sergeant
9c8c17f577 use milliseconds 2019-03-18 20:17:44 -07:00
Benjamin Sergeant
a04f83930f ws / log subcommand name 2019-03-18 17:54:06 -07:00
Benjamin Sergeant
c421d19800 disable sigpipe on osx when writing/reading into a dead pipe 2019-03-18 17:52:01 -07:00
Benjamin Sergeant
521f02c90e edit homebrew install steps 2019-03-18 15:45:33 -07:00
11 changed files with 107 additions and 54 deletions

View File

@@ -183,7 +183,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
You can build and install the ws command line tool with Homebrew.
```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```

View File

@@ -56,7 +56,7 @@ namespace ix
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
@@ -68,7 +68,7 @@ namespace ix
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
@@ -77,13 +77,31 @@ namespace ix
return false;
}
//
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
// on reads/writes to a closed fd
//
// The generation of the SIGPIPE signal can be suppressed using the
// F_SETNOSIGPIPE fcntl command.
//
#ifdef F_SETNOSIGPIPE
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true;
}
@@ -104,6 +122,7 @@ namespace ix
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}

View File

@@ -49,12 +49,12 @@ namespace ix
return;
}
PollResultType pollResult = isReadyToRead(timeoutSecs, 0);
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult);
}
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
PollResultType Socket::select(bool readyToRead, int timeoutMs)
{
fd_set rfds;
fd_set wfds;
@@ -62,7 +62,6 @@ namespace ix
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed
@@ -73,15 +72,15 @@ namespace ix
}
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 1000 * timeoutMs;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
@@ -92,7 +91,7 @@ namespace ix
{
pollResult = PollResultType_Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _selectInterrupt->read();
@@ -105,32 +104,28 @@ namespace ix
pollResult = PollResultType_CloseRequest;
}
}
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
if (readyToRead)
{
pollResult = PollResultType_ReadyForRead;
}
else
{
pollResult = PollResultType_ReadyForWrite;
}
pollResult = PollResultType_ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType_ReadyForWrite;
}
return pollResult;
}
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
PollResultType Socket::isReadyToRead(int timeoutMs)
{
bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs);
return select(readyToRead, timeoutMs);
}
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs);
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
@@ -262,7 +257,7 @@ namespace ix
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType_Error)
{
return false;
}
@@ -331,7 +326,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType_Error)
{
return std::make_pair(false, std::string());
}

View File

@@ -50,9 +50,8 @@ namespace ix
int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
// Virtual methods
virtual bool connect(const std::string& url,
@@ -92,6 +91,8 @@ namespace ix
std::mutex _socketMutex;
private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;

View File

@@ -204,7 +204,8 @@ namespace ix
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(0, 10);
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType_Error)
{
_socket->close();

1
ws/.gitignore vendored
View File

@@ -1 +1,2 @@
build
node_modules

19
ws/package-lock.json generated Normal file
View File

@@ -0,0 +1,19 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"ws": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@@ -1,11 +1,21 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`
exit 1
}
rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test
# Start a transport server
cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile &
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
# Wait until the transfer server is up
while true
@@ -14,21 +24,21 @@ do
echo "Transfer server up and running"
break
}
echo "sleep ..."
echo "sleep ... wait for transfer server"
sleep 0.1
done
# Start a receiver
mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive
ws receive --delay 5 ws://127.0.0.1:8090 &
ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
mkdir /tmp/ws_test/send
cd /tmp/ws_test/send
dd if=/dev/urandom of=20M_file count=20000 bs=1024
# Start the sender job
ws send ws://127.0.0.1:8090 20M_file
ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk
while true
@@ -37,7 +47,7 @@ do
echo "Received file does exists, exiting loop"
break
fi
echo "sleep ..."
echo "sleep ... wait for output file"
sleep 0.1
done
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
sleep 2
# Cleanup
kill `cat /tmp/ws_test/pidfile`
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`

View File

@@ -50,11 +50,13 @@ int main(int argc, char** argv)
sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath);
sendApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver");
receiveApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url");
@@ -96,19 +98,19 @@ int main(int argc, char** argv)
CLI11_PARSE(app, argc, argv);
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
if (app.got_subcommand("transfer"))
{
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
return ix::ws_transfer_main(port, hostname);
}
else if (app.got_subcommand("send"))

View File

@@ -231,6 +231,7 @@ namespace ix
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;

View File

@@ -162,6 +162,7 @@ namespace ix
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;