Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ceba8ae620 | ||
|
|
fead661ab7 | ||
|
|
9c8c17f577 | ||
|
|
a04f83930f | ||
|
|
c421d19800 | ||
|
|
521f02c90e |
@@ -183,7 +183,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
|
|||||||
You can build and install the ws command line tool with Homebrew.
|
You can build and install the ws command line tool with Homebrew.
|
||||||
|
|
||||||
```
|
```
|
||||||
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
|
brew tap bsergean/IXWebSocket
|
||||||
brew install IXWebSocket
|
brew install IXWebSocket
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ namespace ix
|
|||||||
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
|
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << "SelectInterruptPipe::init() failed in fcntl() call"
|
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
|
||||||
<< " : " << strerror(errno);
|
<< " : " << strerror(errno);
|
||||||
errorMsg = ss.str();
|
errorMsg = ss.str();
|
||||||
|
|
||||||
@@ -68,7 +68,7 @@ namespace ix
|
|||||||
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
|
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << "SelectInterruptPipe::init() failed in fcntl() call"
|
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
|
||||||
<< " : " << strerror(errno);
|
<< " : " << strerror(errno);
|
||||||
errorMsg = ss.str();
|
errorMsg = ss.str();
|
||||||
|
|
||||||
@@ -77,13 +77,31 @@ namespace ix
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
#ifdef F_SETNOSIGPIPE
|
||||||
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
|
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
|
||||||
// on reads/writes to a closed fd
|
{
|
||||||
//
|
std::stringstream ss;
|
||||||
// The generation of the SIGPIPE signal can be suppressed using the
|
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
|
||||||
// F_SETNOSIGPIPE fcntl command.
|
<< " : " << strerror(errno);
|
||||||
//
|
errorMsg = ss.str();
|
||||||
|
|
||||||
|
_fildes[kPipeReadIndex] = -1;
|
||||||
|
_fildes[kPipeWriteIndex] = -1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
|
||||||
|
<< " : " << strerror(errno);
|
||||||
|
errorMsg = ss.str();
|
||||||
|
|
||||||
|
_fildes[kPipeReadIndex] = -1;
|
||||||
|
_fildes[kPipeWriteIndex] = -1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -104,6 +122,7 @@ namespace ix
|
|||||||
|
|
||||||
uint64_t value = 0;
|
uint64_t value = 0;
|
||||||
::read(fd, &value, sizeof(value));
|
::read(fd, &value, sizeof(value));
|
||||||
|
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -49,12 +49,12 @@ namespace ix
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
PollResultType pollResult = isReadyToRead(timeoutSecs, 0);
|
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
|
||||||
|
|
||||||
if (onPollCallback) onPollCallback(pollResult);
|
if (onPollCallback) onPollCallback(pollResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
|
PollResultType Socket::select(bool readyToRead, int timeoutMs)
|
||||||
{
|
{
|
||||||
fd_set rfds;
|
fd_set rfds;
|
||||||
fd_set wfds;
|
fd_set wfds;
|
||||||
@@ -62,7 +62,6 @@ namespace ix
|
|||||||
FD_ZERO(&wfds);
|
FD_ZERO(&wfds);
|
||||||
|
|
||||||
fd_set* fds = (readyToRead) ? &rfds : & wfds;
|
fd_set* fds = (readyToRead) ? &rfds : & wfds;
|
||||||
|
|
||||||
FD_SET(_sockfd, fds);
|
FD_SET(_sockfd, fds);
|
||||||
|
|
||||||
// File descriptor used to interrupt select when needed
|
// File descriptor used to interrupt select when needed
|
||||||
@@ -73,15 +72,15 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct timeval timeout;
|
struct timeval timeout;
|
||||||
timeout.tv_sec = timeoutSecs;
|
timeout.tv_sec = timeoutMs / 1000;
|
||||||
timeout.tv_usec = 1000 * timeoutMs;
|
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
|
||||||
|
|
||||||
// Compute the highest fd.
|
// Compute the highest fd.
|
||||||
int sockfd = _sockfd;
|
int sockfd = _sockfd;
|
||||||
int nfds = (std::max)(sockfd, interruptFd);
|
int nfds = (std::max)(sockfd, interruptFd);
|
||||||
|
|
||||||
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
|
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
|
||||||
(timeoutSecs < 0) ? nullptr : &timeout);
|
(timeoutMs < 0) ? nullptr : &timeout);
|
||||||
|
|
||||||
PollResultType pollResult = PollResultType_ReadyForRead;
|
PollResultType pollResult = PollResultType_ReadyForRead;
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
@@ -92,7 +91,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
pollResult = PollResultType_Timeout;
|
pollResult = PollResultType_Timeout;
|
||||||
}
|
}
|
||||||
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
|
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
|
||||||
{
|
{
|
||||||
uint64_t value = _selectInterrupt->read();
|
uint64_t value = _selectInterrupt->read();
|
||||||
|
|
||||||
@@ -105,32 +104,28 @@ namespace ix
|
|||||||
pollResult = PollResultType_CloseRequest;
|
pollResult = PollResultType_CloseRequest;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
|
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
|
||||||
{
|
{
|
||||||
if (readyToRead)
|
pollResult = PollResultType_ReadyForRead;
|
||||||
{
|
}
|
||||||
pollResult = PollResultType_ReadyForRead;
|
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
|
||||||
}
|
{
|
||||||
else
|
pollResult = PollResultType_ReadyForWrite;
|
||||||
{
|
|
||||||
pollResult = PollResultType_ReadyForWrite;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return pollResult;
|
return pollResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
|
PollResultType Socket::isReadyToRead(int timeoutMs)
|
||||||
{
|
{
|
||||||
bool readyToRead = true;
|
bool readyToRead = true;
|
||||||
return select(readyToRead, timeoutSecs, timeoutMs);
|
return select(readyToRead, timeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
|
PollResultType Socket::isReadyToWrite(int timeoutMs)
|
||||||
{
|
{
|
||||||
bool readyToRead = false;
|
bool readyToRead = false;
|
||||||
return select(readyToRead, timeoutSecs, timeoutMs);
|
return select(readyToRead, timeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wake up from poll/select by writing to the pipe which is watched by select
|
// Wake up from poll/select by writing to the pipe which is watched by select
|
||||||
@@ -262,7 +257,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
// Wait with a 1ms timeout until the socket is ready to read.
|
// Wait with a 1ms timeout until the socket is ready to read.
|
||||||
// This way we are not busy looping
|
// This way we are not busy looping
|
||||||
if (isReadyToRead(0, 1) == PollResultType_Error)
|
if (isReadyToRead(1) == PollResultType_Error)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -331,7 +326,7 @@ namespace ix
|
|||||||
|
|
||||||
// Wait with a 1ms timeout until the socket is ready to read.
|
// Wait with a 1ms timeout until the socket is ready to read.
|
||||||
// This way we are not busy looping
|
// This way we are not busy looping
|
||||||
if (isReadyToRead(0, 1) == PollResultType_Error)
|
if (isReadyToRead(1) == PollResultType_Error)
|
||||||
{
|
{
|
||||||
return std::make_pair(false, std::string());
|
return std::make_pair(false, std::string());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,9 +50,8 @@ namespace ix
|
|||||||
int timeoutSecs = kDefaultPollTimeout);
|
int timeoutSecs = kDefaultPollTimeout);
|
||||||
bool wakeUpFromPoll(uint8_t wakeUpCode);
|
bool wakeUpFromPoll(uint8_t wakeUpCode);
|
||||||
|
|
||||||
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
|
PollResultType isReadyToWrite(int timeoutMs);
|
||||||
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs);
|
PollResultType isReadyToRead(int timeoutMs);
|
||||||
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
|
|
||||||
|
|
||||||
// Virtual methods
|
// Virtual methods
|
||||||
virtual bool connect(const std::string& url,
|
virtual bool connect(const std::string& url,
|
||||||
@@ -92,6 +91,8 @@ namespace ix
|
|||||||
std::mutex _socketMutex;
|
std::mutex _socketMutex;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
PollResultType select(bool readyToRead, int timeoutMs);
|
||||||
|
|
||||||
static const int kDefaultPollTimeout;
|
static const int kDefaultPollTimeout;
|
||||||
static const int kDefaultPollNoTimeout;
|
static const int kDefaultPollNoTimeout;
|
||||||
|
|
||||||
|
|||||||
@@ -204,7 +204,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
// Wait with a 10ms timeout until the socket is ready to write.
|
// Wait with a 10ms timeout until the socket is ready to write.
|
||||||
// This way we are not busy looping
|
// This way we are not busy looping
|
||||||
PollResultType result = _socket->isReadyToWrite(0, 10);
|
PollResultType result = _socket->isReadyToWrite(10);
|
||||||
|
|
||||||
if (result == PollResultType_Error)
|
if (result == PollResultType_Error)
|
||||||
{
|
{
|
||||||
_socket->close();
|
_socket->close();
|
||||||
|
|||||||
1
ws/.gitignore
vendored
1
ws/.gitignore
vendored
@@ -1 +1,2 @@
|
|||||||
build
|
build
|
||||||
|
node_modules
|
||||||
|
|||||||
19
ws/package-lock.json
generated
Normal file
19
ws/package-lock.json
generated
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
{
|
||||||
|
"requires": true,
|
||||||
|
"lockfileVersion": 1,
|
||||||
|
"dependencies": {
|
||||||
|
"async-limiter": {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
|
||||||
|
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
|
||||||
|
},
|
||||||
|
"ws": {
|
||||||
|
"version": "6.2.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
|
||||||
|
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
|
||||||
|
"requires": {
|
||||||
|
"async-limiter": "1.0.0"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,11 +1,21 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
|
# Handle Ctrl-C by killing all sub-processing AND exiting
|
||||||
|
trap cleanup INT
|
||||||
|
|
||||||
|
function cleanup {
|
||||||
|
kill `cat /tmp/ws_test/pidfile.transfer`
|
||||||
|
kill `cat /tmp/ws_test/pidfile.receive`
|
||||||
|
kill `cat /tmp/ws_test/pidfile.send`
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
|
||||||
rm -rf /tmp/ws_test
|
rm -rf /tmp/ws_test
|
||||||
mkdir -p /tmp/ws_test
|
mkdir -p /tmp/ws_test
|
||||||
|
|
||||||
# Start a transport server
|
# Start a transport server
|
||||||
cd /tmp/ws_test
|
cd /tmp/ws_test
|
||||||
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile &
|
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
|
||||||
|
|
||||||
# Wait until the transfer server is up
|
# Wait until the transfer server is up
|
||||||
while true
|
while true
|
||||||
@@ -14,21 +24,21 @@ do
|
|||||||
echo "Transfer server up and running"
|
echo "Transfer server up and running"
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
echo "sleep ..."
|
echo "sleep ... wait for transfer server"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
done
|
done
|
||||||
|
|
||||||
# Start a receiver
|
# Start a receiver
|
||||||
mkdir -p /tmp/ws_test/receive
|
mkdir -p /tmp/ws_test/receive
|
||||||
cd /tmp/ws_test/receive
|
cd /tmp/ws_test/receive
|
||||||
ws receive --delay 5 ws://127.0.0.1:8090 &
|
ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
|
||||||
|
|
||||||
mkdir /tmp/ws_test/send
|
mkdir /tmp/ws_test/send
|
||||||
cd /tmp/ws_test/send
|
cd /tmp/ws_test/send
|
||||||
dd if=/dev/urandom of=20M_file count=20000 bs=1024
|
dd if=/dev/urandom of=20M_file count=20000 bs=1024
|
||||||
|
|
||||||
# Start the sender job
|
# Start the sender job
|
||||||
ws send ws://127.0.0.1:8090 20M_file
|
ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
|
||||||
|
|
||||||
# Wait until the file has been written to disk
|
# Wait until the file has been written to disk
|
||||||
while true
|
while true
|
||||||
@@ -37,7 +47,7 @@ do
|
|||||||
echo "Received file does exists, exiting loop"
|
echo "Received file does exists, exiting loop"
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
echo "sleep ..."
|
echo "sleep ... wait for output file"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
done
|
done
|
||||||
|
|
||||||
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
|
|||||||
sleep 2
|
sleep 2
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
kill `cat /tmp/ws_test/pidfile`
|
kill `cat /tmp/ws_test/pidfile.transfer`
|
||||||
|
kill `cat /tmp/ws_test/pidfile.receive`
|
||||||
|
kill `cat /tmp/ws_test/pidfile.send`
|
||||||
|
|
||||||
|
|||||||
24
ws/ws.cpp
24
ws/ws.cpp
@@ -50,11 +50,13 @@ int main(int argc, char** argv)
|
|||||||
sendApp->add_option("url", url, "Connection url")->required();
|
sendApp->add_option("url", url, "Connection url")->required();
|
||||||
sendApp->add_option("path", path, "Path to the file to send")
|
sendApp->add_option("path", path, "Path to the file to send")
|
||||||
->required()->check(CLI::ExistingPath);
|
->required()->check(CLI::ExistingPath);
|
||||||
|
sendApp->add_option("--pidfile", pidfile, "Pid file");
|
||||||
|
|
||||||
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
|
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
|
||||||
receiveApp->add_option("url", url, "Connection url")->required();
|
receiveApp->add_option("url", url, "Connection url")->required();
|
||||||
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
|
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
|
||||||
" to artificially slow down the receiver");
|
" to artificially slow down the receiver");
|
||||||
|
receiveApp->add_option("--pidfile", pidfile, "Pid file");
|
||||||
|
|
||||||
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
|
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
|
||||||
transferApp->add_option("--port", port, "Connection url");
|
transferApp->add_option("--port", port, "Connection url");
|
||||||
@@ -96,19 +98,19 @@ int main(int argc, char** argv)
|
|||||||
|
|
||||||
CLI11_PARSE(app, argc, argv);
|
CLI11_PARSE(app, argc, argv);
|
||||||
|
|
||||||
|
// pid file handling
|
||||||
|
if (!pidfile.empty())
|
||||||
|
{
|
||||||
|
unlink(pidfile.c_str());
|
||||||
|
|
||||||
|
std::ofstream f;
|
||||||
|
f.open(pidfile);
|
||||||
|
f << getpid();
|
||||||
|
f.close();
|
||||||
|
}
|
||||||
|
|
||||||
if (app.got_subcommand("transfer"))
|
if (app.got_subcommand("transfer"))
|
||||||
{
|
{
|
||||||
// pid file handling
|
|
||||||
if (!pidfile.empty())
|
|
||||||
{
|
|
||||||
unlink(pidfile.c_str());
|
|
||||||
|
|
||||||
std::ofstream f;
|
|
||||||
f.open(pidfile);
|
|
||||||
f << getpid();
|
|
||||||
f.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
return ix::ws_transfer_main(port, hostname);
|
return ix::ws_transfer_main(port, hostname);
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("send"))
|
else if (app.got_subcommand("send"))
|
||||||
|
|||||||
@@ -231,6 +231,7 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (messageType == ix::WebSocket_MessageType_Error)
|
else if (messageType == ix::WebSocket_MessageType_Error)
|
||||||
{
|
{
|
||||||
|
ss << "ws_receive ";
|
||||||
ss << "Connection error: " << error.reason << std::endl;
|
ss << "Connection error: " << error.reason << std::endl;
|
||||||
ss << "#retries: " << error.retries << std::endl;
|
ss << "#retries: " << error.retries << std::endl;
|
||||||
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
||||||
|
|||||||
@@ -162,6 +162,7 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (messageType == ix::WebSocket_MessageType_Error)
|
else if (messageType == ix::WebSocket_MessageType_Error)
|
||||||
{
|
{
|
||||||
|
ss << "ws_send ";
|
||||||
ss << "Connection error: " << error.reason << std::endl;
|
ss << "Connection error: " << error.reason << std::endl;
|
||||||
ss << "#retries: " << error.retries << std::endl;
|
ss << "#retries: " << error.retries << std::endl;
|
||||||
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
ss << "Wait time(ms): " << error.wait_time << std::endl;
|
||||||
|
|||||||
Reference in New Issue
Block a user