Compare commits

...

6 Commits

Author SHA1 Message Date
Benjamin Sergeant
ceba8ae620 fix bug with isReadyToWrite 2019-03-18 22:05:04 -07:00
Benjamin Sergeant
fead661ab7 workaround bug in Socket::isReadyToWrite 2019-03-18 20:37:33 -07:00
Benjamin Sergeant
9c8c17f577 use milliseconds 2019-03-18 20:17:44 -07:00
Benjamin Sergeant
a04f83930f ws / log subcommand name 2019-03-18 17:54:06 -07:00
Benjamin Sergeant
c421d19800 disable sigpipe on osx when writing/reading into a dead pipe 2019-03-18 17:52:01 -07:00
Benjamin Sergeant
521f02c90e edit homebrew install steps 2019-03-18 15:45:33 -07:00
11 changed files with 107 additions and 54 deletions

View File

@@ -183,7 +183,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
You can build and install the ws command line tool with Homebrew. You can build and install the ws command line tool with Homebrew.
``` ```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz brew tap bsergean/IXWebSocket
brew install IXWebSocket brew install IXWebSocket
``` ```

View File

@@ -56,7 +56,7 @@ namespace ix
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1) if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{ {
std::stringstream ss; std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call" ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno); << " : " << strerror(errno);
errorMsg = ss.str(); errorMsg = ss.str();
@@ -68,7 +68,7 @@ namespace ix
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1) if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{ {
std::stringstream ss; std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call" ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno); << " : " << strerror(errno);
errorMsg = ss.str(); errorMsg = ss.str();
@@ -77,13 +77,31 @@ namespace ix
return false; return false;
} }
// #ifdef F_SETNOSIGPIPE
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
// on reads/writes to a closed fd {
// std::stringstream ss;
// The generation of the SIGPIPE signal can be suppressed using the ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
// F_SETNOSIGPIPE fcntl command. << " : " << strerror(errno);
// errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true; return true;
} }
@@ -104,6 +122,7 @@ namespace ix
uint64_t value = 0; uint64_t value = 0;
::read(fd, &value, sizeof(value)); ::read(fd, &value, sizeof(value));
return value; return value;
} }

View File

@@ -49,12 +49,12 @@ namespace ix
return; return;
} }
PollResultType pollResult = isReadyToRead(timeoutSecs, 0); PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs) PollResultType Socket::select(bool readyToRead, int timeoutMs)
{ {
fd_set rfds; fd_set rfds;
fd_set wfds; fd_set wfds;
@@ -62,7 +62,6 @@ namespace ix
FD_ZERO(&wfds); FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds; fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds); FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed // File descriptor used to interrupt select when needed
@@ -73,15 +72,15 @@ namespace ix
} }
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = timeoutSecs; timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = 1000 * timeoutMs; timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd. // Compute the highest fd.
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd); int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr, int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0) if (ret < 0)
@@ -92,7 +91,7 @@ namespace ix
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType_Timeout;
} }
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds)) else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{ {
uint64_t value = _selectInterrupt->read(); uint64_t value = _selectInterrupt->read();
@@ -105,32 +104,28 @@ namespace ix
pollResult = PollResultType_CloseRequest; pollResult = PollResultType_CloseRequest;
} }
} }
else if (sockfd != -1 && FD_ISSET(sockfd, fds)) else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{ {
if (readyToRead) pollResult = PollResultType_ReadyForRead;
{ }
pollResult = PollResultType_ReadyForRead; else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
} {
else pollResult = PollResultType_ReadyForWrite;
{
pollResult = PollResultType_ReadyForWrite;
}
} }
return pollResult; return pollResult;
} }
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs) PollResultType Socket::isReadyToRead(int timeoutMs)
{ {
bool readyToRead = true; bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs); return select(readyToRead, timeoutMs);
} }
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs) PollResultType Socket::isReadyToWrite(int timeoutMs)
{ {
bool readyToRead = false; bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs); return select(readyToRead, timeoutMs);
} }
// Wake up from poll/select by writing to the pipe which is watched by select // Wake up from poll/select by writing to the pipe which is watched by select
@@ -262,7 +257,7 @@ namespace ix
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType_Error)
{ {
return false; return false;
} }
@@ -331,7 +326,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType_Error)
{ {
return std::make_pair(false, std::string()); return std::make_pair(false, std::string());
} }

View File

@@ -50,9 +50,8 @@ namespace ix
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs); PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs); PollResultType isReadyToRead(int timeoutMs);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@@ -92,6 +91,8 @@ namespace ix
std::mutex _socketMutex; std::mutex _socketMutex;
private: private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;

View File

@@ -204,7 +204,8 @@ namespace ix
{ {
// Wait with a 10ms timeout until the socket is ready to write. // Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping // This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(0, 10); PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType_Error) if (result == PollResultType_Error)
{ {
_socket->close(); _socket->close();

1
ws/.gitignore vendored
View File

@@ -1 +1,2 @@
build build
node_modules

19
ws/package-lock.json generated Normal file
View File

@@ -0,0 +1,19 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"ws": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@@ -1,11 +1,21 @@
#!/bin/sh #!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`
exit 1
}
rm -rf /tmp/ws_test rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test mkdir -p /tmp/ws_test
# Start a transport server # Start a transport server
cd /tmp/ws_test cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile & ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
# Wait until the transfer server is up # Wait until the transfer server is up
while true while true
@@ -14,21 +24,21 @@ do
echo "Transfer server up and running" echo "Transfer server up and running"
break break
} }
echo "sleep ..." echo "sleep ... wait for transfer server"
sleep 0.1 sleep 0.1
done done
# Start a receiver # Start a receiver
mkdir -p /tmp/ws_test/receive mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive cd /tmp/ws_test/receive
ws receive --delay 5 ws://127.0.0.1:8090 & ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
mkdir /tmp/ws_test/send mkdir /tmp/ws_test/send
cd /tmp/ws_test/send cd /tmp/ws_test/send
dd if=/dev/urandom of=20M_file count=20000 bs=1024 dd if=/dev/urandom of=20M_file count=20000 bs=1024
# Start the sender job # Start the sender job
ws send ws://127.0.0.1:8090 20M_file ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk # Wait until the file has been written to disk
while true while true
@@ -37,7 +47,7 @@ do
echo "Received file does exists, exiting loop" echo "Received file does exists, exiting loop"
break break
fi fi
echo "sleep ..." echo "sleep ... wait for output file"
sleep 0.1 sleep 0.1
done done
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
sleep 2 sleep 2
# Cleanup # Cleanup
kill `cat /tmp/ws_test/pidfile` kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`

View File

@@ -50,11 +50,13 @@ int main(int argc, char** argv)
sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send") sendApp->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath); ->required()->check(CLI::ExistingPath);
sendApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file"); CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required(); receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment" receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver"); " to artificially slow down the receiver");
receiveApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
@@ -96,19 +98,19 @@ int main(int argc, char** argv)
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
return ix::ws_transfer_main(port, hostname); return ix::ws_transfer_main(port, hostname);
} }
else if (app.got_subcommand("send")) else if (app.got_subcommand("send"))

View File

@@ -231,6 +231,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;

View File

@@ -162,6 +162,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;