Socket code refactoring, plus stop polling with a 1s timeout in readBytes while we only want to poll with a 1ms timeout

This commit is contained in:
Benjamin Sergeant 2019-03-04 13:40:00 -08:00
parent fe7d94194c
commit 9c3b0b08ec
2 changed files with 39 additions and 21 deletions

View File

@ -28,7 +28,7 @@ namespace ix
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
_readBuffer.resize(kChunkSize); ;
} }
Socket::~Socket() Socket::~Socket()
@ -44,22 +44,7 @@ namespace ix
return; return;
} }
fd_set rfds; int ret = select(timeoutSecs, 0);
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
#ifdef __linux__
FD_SET(_eventfd.getFd(), &rfds);
#endif
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 0;
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd());
int ret = select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0) if (ret < 0)
@ -74,6 +59,27 @@ namespace ix
if (onPollCallback) onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
int Socket::select(int timeoutSecs, int timeoutMs)
{
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
#ifdef __linux__
FD_SET(_eventfd.getFd(), &rfds);
#endif
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 1000 * timeoutMs;
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd());
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
return ret;
}
void Socket::wakeUpFromPoll() void Socket::wakeUpFromPoll()
{ {
// this will wake up the thread blocked on select, only needed on Linux // this will wake up the thread blocked on select, only needed on Linux
@ -216,8 +222,13 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// wait with 1 ms timeout // Wait with a timeout until something is ready to read.
poll(nullptr, 1); // This way we are not busy looping
int res = select(0, 1);
if (res < 0 && (errno == EBADF || errno == EINVAL))
{
return false;
}
} }
// There was an error during the read, abort // There was an error during the read, abort
else else
@ -253,6 +264,11 @@ namespace ix
const OnProgressCallback& onProgressCallback, const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output; std::vector<uint8_t> output;
while (output.size() != length) while (output.size() != length)
{ {
@ -276,8 +292,9 @@ namespace ix
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Error // Wait with a timeout until something is ready to read.
poll(nullptr, 10); // This way we are not busy looping
select(0, 1);
} }
return std::make_pair(true, std::string(output.begin(), return std::make_pair(true, std::string(output.begin(),

View File

@ -39,6 +39,7 @@ namespace ix
void configure(); void configure();
int select(int timeoutSecs, int timeoutMs);
virtual void poll(const OnPollCallback& onPollCallback, virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll(); virtual void wakeUpFromPoll();