From 4de3ec995eb4a713e3039301d23508a679258c5e Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Tue, 12 Mar 2019 18:32:42 -0700 Subject: [PATCH] try to use a pipe for communication --- ixwebsocket/IXSocket.cpp | 46 ++++++++++++++++++++++++++++++++++++---- ixwebsocket/IXSocket.h | 3 +++ test/run.py | 3 ++- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index d5593655..f721dfd5 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -28,7 +28,8 @@ namespace ix Socket::Socket(int fd) : _sockfd(fd) { - ; + _fildes[0] = -1; + _fildes[1] = -1; } Socket::~Socket() @@ -69,21 +70,49 @@ namespace ix FD_SET(_eventfd.getFd(), &rfds); #endif + if (_fildes[0] != -1) + { + FD_SET(_fildes[0], &rfds); + } + struct timeval timeout; timeout.tv_sec = timeoutSecs; timeout.tv_usec = 1000 * timeoutMs; - int sockfd = _sockfd; - int nfds = (std::max)(sockfd, _eventfd.getFd()); + // int sockfd = _sockfd; + // int nfds = (std::max)(sockfd, _eventfd.getFd()); + + std::vector fds = { _sockfd, _eventfd.getFd(), _fildes[0] }; + int nfds = -1; + for (auto fd : fds) + { + if (fd >= nfds) + { + nfds = fd; + } + } + int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, (timeoutSecs < 0) ? nullptr : &timeout); + + if (_fildes[0] != -1 && FD_ISSET(_fildes[0], &rfds)) + { + fprintf(stderr, "something wrote to the pipe\n"); + + uint64_t value = 0; + read(_fildes[0], &value, sizeof(value)); + } + return ret; } void Socket::wakeUpFromPoll() { // this will wake up the thread blocked on select, only needed on Linux - _eventfd.notify(); + // _eventfd.notify(); + + uint64_t value = 0; + write(_fildes[1], &value, sizeof(value)); } bool Socket::connect(const std::string& host, @@ -94,6 +123,10 @@ namespace ix std::lock_guard lock(_socketMutex); if (!_eventfd.clear()) return false; + if (pipe(_fildes) < 0) return false; + + fcntl(_fildes[0], F_SETFL, O_NONBLOCK); + fcntl(_fildes[1], F_SETFL, O_NONBLOCK); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); return _sockfd != -1; @@ -107,6 +140,11 @@ namespace ix closeSocket(_sockfd); _sockfd = -1; + + ::close(_fildes[0]); + ::close(_fildes[1]); + _fildes[0] = -1; + _fildes[1] = -1; } ssize_t Socket::send(char* buffer, size_t length) diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 7998a4e7..cfce8bda 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -11,6 +11,7 @@ #include #include #include +#include // pipe #ifdef _WIN32 #include @@ -87,5 +88,7 @@ namespace ix // Buffer for reading from our socket. That buffer is never resized. std::vector _readBuffer; static constexpr size_t kChunkSize = 1 << 15; + + int _fildes[2]; }; } diff --git a/test/run.py b/test/run.py index 664dee4c..5a4cd8e6 100644 --- a/test/run.py +++ b/test/run.py @@ -77,6 +77,7 @@ shutil.copy(os.path.join( 'bin', 'zlib.dll'), '.') -testCommand = '{} {}'.format(testBinary, os.getenv('TEST', '')) +lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'" +testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', '')) ret = os.system(testCommand) assert ret == 0, 'Test command failed'