try to use a pipe for communication

This commit is contained in:
Benjamin Sergeant 2019-03-12 18:32:42 -07:00
parent d6597d9f52
commit 4de3ec995e
3 changed files with 47 additions and 5 deletions

View File

@ -28,7 +28,8 @@ namespace ix
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
; _fildes[0] = -1;
_fildes[1] = -1;
} }
Socket::~Socket() Socket::~Socket()
@ -69,21 +70,49 @@ namespace ix
FD_SET(_eventfd.getFd(), &rfds); FD_SET(_eventfd.getFd(), &rfds);
#endif #endif
if (_fildes[0] != -1)
{
FD_SET(_fildes[0], &rfds);
}
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = timeoutSecs; timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 1000 * timeoutMs; timeout.tv_usec = 1000 * timeoutMs;
int sockfd = _sockfd; // int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd()); // int nfds = (std::max)(sockfd, _eventfd.getFd());
std::vector<int> fds = { _sockfd, _eventfd.getFd(), _fildes[0] };
int nfds = -1;
for (auto fd : fds)
{
if (fd >= nfds)
{
nfds = fd;
}
}
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutSecs < 0) ? nullptr : &timeout);
if (_fildes[0] != -1 && FD_ISSET(_fildes[0], &rfds))
{
fprintf(stderr, "something wrote to the pipe\n");
uint64_t value = 0;
read(_fildes[0], &value, sizeof(value));
}
return ret; return ret;
} }
void Socket::wakeUpFromPoll() void Socket::wakeUpFromPoll()
{ {
// this will wake up the thread blocked on select, only needed on Linux // this will wake up the thread blocked on select, only needed on Linux
_eventfd.notify(); // _eventfd.notify();
uint64_t value = 0;
write(_fildes[1], &value, sizeof(value));
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -94,6 +123,10 @@ namespace ix
std::lock_guard<std::mutex> lock(_socketMutex); std::lock_guard<std::mutex> lock(_socketMutex);
if (!_eventfd.clear()) return false; if (!_eventfd.clear()) return false;
if (pipe(_fildes) < 0) return false;
fcntl(_fildes[0], F_SETFL, O_NONBLOCK);
fcntl(_fildes[1], F_SETFL, O_NONBLOCK);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1; return _sockfd != -1;
@ -107,6 +140,11 @@ namespace ix
closeSocket(_sockfd); closeSocket(_sockfd);
_sockfd = -1; _sockfd = -1;
::close(_fildes[0]);
::close(_fildes[1]);
_fildes[0] = -1;
_fildes[1] = -1;
} }
ssize_t Socket::send(char* buffer, size_t length) ssize_t Socket::send(char* buffer, size_t length)

View File

@ -11,6 +11,7 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <unistd.h> // pipe
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
@ -87,5 +88,7 @@ namespace ix
// Buffer for reading from our socket. That buffer is never resized. // Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer; std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15; static constexpr size_t kChunkSize = 1 << 15;
int _fildes[2];
}; };
} }

View File

@ -77,6 +77,7 @@ shutil.copy(os.path.join(
'bin', 'bin',
'zlib.dll'), '.') 'zlib.dll'), '.')
testCommand = '{} {}'.format(testBinary, os.getenv('TEST', '')) lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'"
testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', ''))
ret = os.system(testCommand) ret = os.system(testCommand)
assert ret == 0, 'Test command failed' assert ret == 0, 'Test command failed'