Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ceba8ae620 | ||
|
|
fead661ab7 | ||
|
|
9c8c17f577 | ||
|
|
a04f83930f | ||
|
|
c421d19800 | ||
|
|
521f02c90e |
@@ -183,7 +183,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
|
||||
You can build and install the ws command line tool with Homebrew.
|
||||
|
||||
```
|
||||
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
|
||||
brew tap bsergean/IXWebSocket
|
||||
brew install IXWebSocket
|
||||
```
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ namespace ix
|
||||
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl() call"
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
|
||||
<< " : " << strerror(errno);
|
||||
errorMsg = ss.str();
|
||||
|
||||
@@ -68,7 +68,7 @@ namespace ix
|
||||
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl() call"
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
|
||||
<< " : " << strerror(errno);
|
||||
errorMsg = ss.str();
|
||||
|
||||
@@ -77,13 +77,31 @@ namespace ix
|
||||
return false;
|
||||
}
|
||||
|
||||
//
|
||||
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
|
||||
// on reads/writes to a closed fd
|
||||
//
|
||||
// The generation of the SIGPIPE signal can be suppressed using the
|
||||
// F_SETNOSIGPIPE fcntl command.
|
||||
//
|
||||
#ifdef F_SETNOSIGPIPE
|
||||
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
|
||||
<< " : " << strerror(errno);
|
||||
errorMsg = ss.str();
|
||||
|
||||
_fildes[kPipeReadIndex] = -1;
|
||||
_fildes[kPipeWriteIndex] = -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
|
||||
<< " : " << strerror(errno);
|
||||
errorMsg = ss.str();
|
||||
|
||||
_fildes[kPipeReadIndex] = -1;
|
||||
_fildes[kPipeWriteIndex] = -1;
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -104,6 +122,7 @@ namespace ix
|
||||
|
||||
uint64_t value = 0;
|
||||
::read(fd, &value, sizeof(value));
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
@@ -49,12 +49,12 @@ namespace ix
|
||||
return;
|
||||
}
|
||||
|
||||
PollResultType pollResult = isReadyToRead(timeoutSecs, 0);
|
||||
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
|
||||
|
||||
if (onPollCallback) onPollCallback(pollResult);
|
||||
}
|
||||
|
||||
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
|
||||
PollResultType Socket::select(bool readyToRead, int timeoutMs)
|
||||
{
|
||||
fd_set rfds;
|
||||
fd_set wfds;
|
||||
@@ -62,7 +62,6 @@ namespace ix
|
||||
FD_ZERO(&wfds);
|
||||
|
||||
fd_set* fds = (readyToRead) ? &rfds : & wfds;
|
||||
|
||||
FD_SET(_sockfd, fds);
|
||||
|
||||
// File descriptor used to interrupt select when needed
|
||||
@@ -73,15 +72,15 @@ namespace ix
|
||||
}
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = timeoutSecs;
|
||||
timeout.tv_usec = 1000 * timeoutMs;
|
||||
timeout.tv_sec = timeoutMs / 1000;
|
||||
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
|
||||
|
||||
// Compute the highest fd.
|
||||
int sockfd = _sockfd;
|
||||
int nfds = (std::max)(sockfd, interruptFd);
|
||||
|
||||
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
|
||||
(timeoutSecs < 0) ? nullptr : &timeout);
|
||||
(timeoutMs < 0) ? nullptr : &timeout);
|
||||
|
||||
PollResultType pollResult = PollResultType_ReadyForRead;
|
||||
if (ret < 0)
|
||||
@@ -92,7 +91,7 @@ namespace ix
|
||||
{
|
||||
pollResult = PollResultType_Timeout;
|
||||
}
|
||||
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
|
||||
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
|
||||
{
|
||||
uint64_t value = _selectInterrupt->read();
|
||||
|
||||
@@ -105,32 +104,28 @@ namespace ix
|
||||
pollResult = PollResultType_CloseRequest;
|
||||
}
|
||||
}
|
||||
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
|
||||
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
|
||||
{
|
||||
if (readyToRead)
|
||||
{
|
||||
pollResult = PollResultType_ReadyForRead;
|
||||
}
|
||||
else
|
||||
{
|
||||
pollResult = PollResultType_ReadyForWrite;
|
||||
}
|
||||
pollResult = PollResultType_ReadyForRead;
|
||||
}
|
||||
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
|
||||
{
|
||||
pollResult = PollResultType_ReadyForWrite;
|
||||
}
|
||||
|
||||
|
||||
return pollResult;
|
||||
}
|
||||
|
||||
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
|
||||
PollResultType Socket::isReadyToRead(int timeoutMs)
|
||||
{
|
||||
bool readyToRead = true;
|
||||
return select(readyToRead, timeoutSecs, timeoutMs);
|
||||
return select(readyToRead, timeoutMs);
|
||||
}
|
||||
|
||||
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
|
||||
PollResultType Socket::isReadyToWrite(int timeoutMs)
|
||||
{
|
||||
bool readyToRead = false;
|
||||
return select(readyToRead, timeoutSecs, timeoutMs);
|
||||
return select(readyToRead, timeoutMs);
|
||||
}
|
||||
|
||||
// Wake up from poll/select by writing to the pipe which is watched by select
|
||||
@@ -262,7 +257,7 @@ namespace ix
|
||||
{
|
||||
// Wait with a 1ms timeout until the socket is ready to read.
|
||||
// This way we are not busy looping
|
||||
if (isReadyToRead(0, 1) == PollResultType_Error)
|
||||
if (isReadyToRead(1) == PollResultType_Error)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@@ -331,7 +326,7 @@ namespace ix
|
||||
|
||||
// Wait with a 1ms timeout until the socket is ready to read.
|
||||
// This way we are not busy looping
|
||||
if (isReadyToRead(0, 1) == PollResultType_Error)
|
||||
if (isReadyToRead(1) == PollResultType_Error)
|
||||
{
|
||||
return std::make_pair(false, std::string());
|
||||
}
|
||||
|
||||
@@ -50,9 +50,8 @@ namespace ix
|
||||
int timeoutSecs = kDefaultPollTimeout);
|
||||
bool wakeUpFromPoll(uint8_t wakeUpCode);
|
||||
|
||||
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
|
||||
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs);
|
||||
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
|
||||
PollResultType isReadyToWrite(int timeoutMs);
|
||||
PollResultType isReadyToRead(int timeoutMs);
|
||||
|
||||
// Virtual methods
|
||||
virtual bool connect(const std::string& url,
|
||||
@@ -92,6 +91,8 @@ namespace ix
|
||||
std::mutex _socketMutex;
|
||||
|
||||
private:
|
||||
PollResultType select(bool readyToRead, int timeoutMs);
|
||||
|
||||
static const int kDefaultPollTimeout;
|
||||
static const int kDefaultPollNoTimeout;
|
||||
|
||||
|
||||
@@ -204,7 +204,8 @@ namespace ix
|
||||
{
|
||||
// Wait with a 10ms timeout until the socket is ready to write.
|
||||
// This way we are not busy looping
|
||||
PollResultType result = _socket->isReadyToWrite(0, 10);
|
||||
PollResultType result = _socket->isReadyToWrite(10);
|
||||
|
||||
if (result == PollResultType_Error)
|
||||
{
|
||||
_socket->close();
|
||||
|
||||
1
ws/.gitignore
vendored
1
ws/.gitignore
vendored
@@ -1 +1,2 @@
|
||||
build
|
||||
node_modules
|
||||
|
||||
19
ws/package-lock.json
generated
Normal file
19
ws/package-lock.json
generated
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"requires": true,
|
||||
"lockfileVersion": 1,
|
||||
"dependencies": {
|
||||
"async-limiter": {
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
|
||||
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
|
||||
},
|
||||
"ws": {
|
||||
"version": "6.2.0",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
|
||||
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
|
||||
"requires": {
|
||||
"async-limiter": "1.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,21 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Handle Ctrl-C by killing all sub-processing AND exiting
|
||||
trap cleanup INT
|
||||
|
||||
function cleanup {
|
||||
kill `cat /tmp/ws_test/pidfile.transfer`
|
||||
kill `cat /tmp/ws_test/pidfile.receive`
|
||||
kill `cat /tmp/ws_test/pidfile.send`
|
||||
exit 1
|
||||
}
|
||||
|
||||
rm -rf /tmp/ws_test
|
||||
mkdir -p /tmp/ws_test
|
||||
|
||||
# Start a transport server
|
||||
cd /tmp/ws_test
|
||||
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile &
|
||||
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
|
||||
|
||||
# Wait until the transfer server is up
|
||||
while true
|
||||
@@ -14,21 +24,21 @@ do
|
||||
echo "Transfer server up and running"
|
||||
break
|
||||
}
|
||||
echo "sleep ..."
|
||||
echo "sleep ... wait for transfer server"
|
||||
sleep 0.1
|
||||
done
|
||||
|
||||
# Start a receiver
|
||||
mkdir -p /tmp/ws_test/receive
|
||||
cd /tmp/ws_test/receive
|
||||
ws receive --delay 5 ws://127.0.0.1:8090 &
|
||||
ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
|
||||
|
||||
mkdir /tmp/ws_test/send
|
||||
cd /tmp/ws_test/send
|
||||
dd if=/dev/urandom of=20M_file count=20000 bs=1024
|
||||
|
||||
# Start the sender job
|
||||
ws send ws://127.0.0.1:8090 20M_file
|
||||
ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
|
||||
|
||||
# Wait until the file has been written to disk
|
||||
while true
|
||||
@@ -37,7 +47,7 @@ do
|
||||
echo "Received file does exists, exiting loop"
|
||||
break
|
||||
fi
|
||||
echo "sleep ..."
|
||||
echo "sleep ... wait for output file"
|
||||
sleep 0.1
|
||||
done
|
||||
|
||||
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
|
||||
sleep 2
|
||||
|
||||
# Cleanup
|
||||
kill `cat /tmp/ws_test/pidfile`
|
||||
kill `cat /tmp/ws_test/pidfile.transfer`
|
||||
kill `cat /tmp/ws_test/pidfile.receive`
|
||||
kill `cat /tmp/ws_test/pidfile.send`
|
||||
|
||||
|
||||
24
ws/ws.cpp
24
ws/ws.cpp
@@ -50,11 +50,13 @@ int main(int argc, char** argv)
|
||||
sendApp->add_option("url", url, "Connection url")->required();
|
||||
sendApp->add_option("path", path, "Path to the file to send")
|
||||
->required()->check(CLI::ExistingPath);
|
||||
sendApp->add_option("--pidfile", pidfile, "Pid file");
|
||||
|
||||
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
|
||||
receiveApp->add_option("url", url, "Connection url")->required();
|
||||
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
|
||||
" to artificially slow down the receiver");
|
||||
receiveApp->add_option("--pidfile", pidfile, "Pid file");
|
||||
|
||||
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
|
||||
transferApp->add_option("--port", port, "Connection url");
|
||||
@@ -96,19 +98,19 @@ int main(int argc, char** argv)
|
||||
|
||||
CLI11_PARSE(app, argc, argv);
|
||||
|
||||
// pid file handling
|
||||
if (!pidfile.empty())
|
||||
{
|
||||
unlink(pidfile.c_str());
|
||||
|
||||
std::ofstream f;
|
||||
f.open(pidfile);
|
||||
f << getpid();
|
||||
f.close();
|
||||
}
|
||||
|
||||
if (app.got_subcommand("transfer"))
|
||||
{
|
||||
// pid file handling
|
||||
if (!pidfile.empty())
|
||||
{
|
||||
unlink(pidfile.c_str());
|
||||
|
||||
std::ofstream f;
|
||||
f.open(pidfile);
|
||||
f << getpid();
|
||||
f.close();
|
||||
}
|
||||
|
||||
return ix::ws_transfer_main(port, hostname);
|
||||
}
|
||||
else if (app.got_subcommand("send"))
|
||||
|
||||
@@ -231,6 +231,7 @@ namespace ix
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Error)
|
||||
{
|
||||
ss << "ws_receive ";
|
||||
ss << "Connection error: " << error.reason << std::endl;
|
||||
ss << "#retries: " << error.retries << std::endl;
|
||||
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
||||
|
||||
@@ -162,6 +162,7 @@ namespace ix
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Error)
|
||||
{
|
||||
ss << "ws_send ";
|
||||
ss << "Connection error: " << error.reason << std::endl;
|
||||
ss << "#retries: " << error.retries << std::endl;
|
||||
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
||||
|
||||
Reference in New Issue
Block a user