readBytes does not read bytes one by one but in chunks

This commit is contained in:
Benjamin Sergeant 2019-03-02 21:11:16 -08:00
parent a9e772f330
commit 0c226c7629
7 changed files with 49 additions and 49 deletions

View File

@ -231,7 +231,9 @@ namespace ix
payload.reserve(contentLength); payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength, isCancellationRequested); auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
@ -274,7 +276,9 @@ namespace ix
payload.reserve(payload.size() + chunkSize); payload.reserve(payload.size() + chunkSize);
// Read a chunk // Read a chunk
auto chunkResult = _socket->readBytes(chunkSize, isCancellationRequested); auto chunkResult = _socket->readBytes(chunkSize,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";

View File

@ -61,6 +61,7 @@ namespace ix
bool verbose; bool verbose;
bool compress; bool compress;
Logger logger; Logger logger;
OnProgressCallback onProgressCallback;
}; };
class HttpClient { class HttpClient {

View File

@ -23,11 +23,12 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
_readBuffer.resize(kChunkSize);
} }
Socket::~Socket() Socket::~Socket()
@ -39,7 +40,7 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
onPollCallback(PollResultType_Error); if (onPollCallback) onPollCallback(PollResultType_Error);
return; return;
} }
@ -70,7 +71,7 @@ namespace ix
pollResult = PollResultType_Timeout; pollResult = PollResultType_Timeout;
} }
onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
void Socket::wakeUpFromPoll() void Socket::wakeUpFromPoll()
@ -215,23 +216,8 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// Wait with a timeout until something is written. // wait with 1 ms timeout
// This way we are not busy looping poll(nullptr, 1);
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
return false;
}
continue;
} }
// There was an error during the read, abort // There was an error during the read, abort
else else
@ -264,43 +250,37 @@ namespace ix
std::pair<bool, std::string> Socket::readBytes( std::pair<bool, std::string> Socket::readBytes(
size_t length, size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
std::string buffer; std::vector<uint8_t> output;
buffer.reserve(length); while (output.size() != length)
#if 1
while (buffer.size() != length)
{ {
ssize_t ret; if (isCancellationRequested()) return std::make_pair(false, std::string());
std::string buf;
ret = recv((char*)&buf.front(), std::min((size_t) 1024, length)); int size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK && if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN)) getErrno() != EAGAIN))
{ {
// error case // Error
// Return what we were able to read return std::make_pair(false, std::string());
return std::make_pair(false, buffer);
} }
else else if (ret > 0)
{ {
buffer += buf; output.insert(output.end(),
} _readBuffer.begin(),
} _readBuffer.begin() + ret);
#else
for (size_t i = 0; i < length; ++i)
{
if (!readByte(&c, isCancellationRequested))
{
// Return what we were able to read
return std::make_pair(false, buffer);
} }
buffer += c; if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
}
#endif
return std::make_pair(true, buffer); // Error
poll(nullptr, 10);
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
} }
} }

View File

@ -10,6 +10,7 @@
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <vector>
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
@ -18,6 +19,7 @@ typedef SSIZE_T ssize_t;
#include "IXEventFd.h" #include "IXEventFd.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
namespace ix namespace ix
{ {
@ -63,6 +65,7 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes( std::pair<bool, std::string> readBytes(
size_t length, size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
@ -79,5 +82,9 @@ namespace ix
private: private:
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
}; };
} }

View File

@ -573,7 +573,7 @@ namespace ix
// Send message // Send message
sendFragment(opcodeType, fin, begin, end, compress); sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback(i, steps)) if (onProgressCallback && !onProgressCallback((int)i, (int) steps))
{ {
break; break;
} }

View File

@ -107,6 +107,12 @@ namespace ix
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
HttpParameters httpParameters = parsePostParameters(data); HttpParameters httpParameters = parsePostParameters(data);
@ -125,6 +131,8 @@ namespace ix
out = httpClient.post(url, httpParameters, args); out = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl;
auto statusCode = std::get<0>(out); auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out); auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out); auto responseHeaders = std::get<2>(out);