Socket code refactoring, plus stop polling with a 1s timeout in readBytes while we only want to poll with a 1ms timeout
This commit is contained in:
		| @@ -28,7 +28,7 @@ namespace ix | |||||||
|     Socket::Socket(int fd) : |     Socket::Socket(int fd) : | ||||||
|         _sockfd(fd) |         _sockfd(fd) | ||||||
|     { |     { | ||||||
|         _readBuffer.resize(kChunkSize); |         ; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     Socket::~Socket() |     Socket::~Socket() | ||||||
| @@ -44,22 +44,7 @@ namespace ix | |||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         fd_set rfds; |         int ret = select(timeoutSecs, 0); | ||||||
|         FD_ZERO(&rfds); |  | ||||||
|         FD_SET(_sockfd, &rfds); |  | ||||||
|  |  | ||||||
| #ifdef __linux__ |  | ||||||
|         FD_SET(_eventfd.getFd(), &rfds); |  | ||||||
| #endif |  | ||||||
|  |  | ||||||
|         struct timeval timeout; |  | ||||||
|         timeout.tv_sec = timeoutSecs; |  | ||||||
|         timeout.tv_usec = 0; |  | ||||||
|  |  | ||||||
|         int sockfd = _sockfd; |  | ||||||
|         int nfds = (std::max)(sockfd, _eventfd.getFd()); |  | ||||||
|         int ret = select(nfds + 1, &rfds, nullptr, nullptr, |  | ||||||
|                          (timeoutSecs < 0) ? nullptr : &timeout); |  | ||||||
|  |  | ||||||
|         PollResultType pollResult = PollResultType_ReadyForRead; |         PollResultType pollResult = PollResultType_ReadyForRead; | ||||||
|         if (ret < 0) |         if (ret < 0) | ||||||
| @@ -74,6 +59,27 @@ namespace ix | |||||||
|         if (onPollCallback) onPollCallback(pollResult); |         if (onPollCallback) onPollCallback(pollResult); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     int Socket::select(int timeoutSecs, int timeoutMs) | ||||||
|  |     { | ||||||
|  |         fd_set rfds; | ||||||
|  |         FD_ZERO(&rfds); | ||||||
|  |         FD_SET(_sockfd, &rfds); | ||||||
|  |  | ||||||
|  | #ifdef __linux__ | ||||||
|  |         FD_SET(_eventfd.getFd(), &rfds); | ||||||
|  | #endif | ||||||
|  |  | ||||||
|  |         struct timeval timeout; | ||||||
|  |         timeout.tv_sec = timeoutSecs; | ||||||
|  |         timeout.tv_usec = 1000 * timeoutMs; | ||||||
|  |  | ||||||
|  |         int sockfd = _sockfd; | ||||||
|  |         int nfds = (std::max)(sockfd, _eventfd.getFd()); | ||||||
|  |         int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, | ||||||
|  |                            (timeoutSecs < 0) ? nullptr : &timeout); | ||||||
|  |         return ret; | ||||||
|  |     } | ||||||
|  |  | ||||||
|     void Socket::wakeUpFromPoll() |     void Socket::wakeUpFromPoll() | ||||||
|     { |     { | ||||||
|         // this will wake up the thread blocked on select, only needed on Linux |         // this will wake up the thread blocked on select, only needed on Linux | ||||||
| @@ -216,8 +222,13 @@ namespace ix | |||||||
|             else if (ret < 0 && (getErrno() == EWOULDBLOCK || |             else if (ret < 0 && (getErrno() == EWOULDBLOCK || | ||||||
|                                  getErrno() == EAGAIN)) |                                  getErrno() == EAGAIN)) | ||||||
|             { |             { | ||||||
|                 // wait with 1 ms timeout |                 // Wait with a timeout until something is ready to read. | ||||||
|                 poll(nullptr, 1); |                 // This way we are not busy looping | ||||||
|  |                 int res = select(0, 1); | ||||||
|  |                 if (res < 0 && (errno == EBADF || errno == EINVAL)) | ||||||
|  |                 { | ||||||
|  |                     return false; | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|             // There was an error during the read, abort |             // There was an error during the read, abort | ||||||
|             else |             else | ||||||
| @@ -253,6 +264,11 @@ namespace ix | |||||||
|         const OnProgressCallback& onProgressCallback, |         const OnProgressCallback& onProgressCallback, | ||||||
|         const CancellationRequest& isCancellationRequested) |         const CancellationRequest& isCancellationRequested) | ||||||
|     { |     { | ||||||
|  |         if (_readBuffer.empty()) | ||||||
|  |         { | ||||||
|  |             _readBuffer.resize(kChunkSize); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         std::vector<uint8_t> output; |         std::vector<uint8_t> output; | ||||||
|         while (output.size() != length) |         while (output.size() != length) | ||||||
|         { |         { | ||||||
| @@ -276,8 +292,9 @@ namespace ix | |||||||
|  |  | ||||||
|             if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); |             if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); | ||||||
|  |  | ||||||
|             // Error |             // Wait with a timeout until something is ready to read. | ||||||
|             poll(nullptr, 10); |             // This way we are not busy looping | ||||||
|  |             select(0, 1); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         return std::make_pair(true, std::string(output.begin(), |         return std::make_pair(true, std::string(output.begin(), | ||||||
|   | |||||||
| @@ -39,6 +39,7 @@ namespace ix | |||||||
|  |  | ||||||
|         void configure(); |         void configure(); | ||||||
|  |  | ||||||
|  |         int select(int timeoutSecs, int timeoutMs); | ||||||
|         virtual void poll(const OnPollCallback& onPollCallback, |         virtual void poll(const OnPollCallback& onPollCallback, | ||||||
|                           int timeoutSecs = kDefaultPollTimeout); |                           int timeoutSecs = kDefaultPollTimeout); | ||||||
|         virtual void wakeUpFromPoll(); |         virtual void wakeUpFromPoll(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user