diff --git a/ixwebsocket/IXHttpClient.cpp b/ixwebsocket/IXHttpClient.cpp index d27882c0..a6730c82 100644 --- a/ixwebsocket/IXHttpClient.cpp +++ b/ixwebsocket/IXHttpClient.cpp @@ -231,7 +231,9 @@ namespace ix payload.reserve(contentLength); - auto chunkResult = _socket->readBytes(contentLength, isCancellationRequested); + auto chunkResult = _socket->readBytes(contentLength, + args.onProgressCallback, + isCancellationRequested); if (!chunkResult.first) { errorMsg = "Cannot read chunk"; @@ -274,7 +276,9 @@ namespace ix payload.reserve(payload.size() + chunkSize); // Read a chunk - auto chunkResult = _socket->readBytes(chunkSize, isCancellationRequested); + auto chunkResult = _socket->readBytes(chunkSize, + args.onProgressCallback, + isCancellationRequested); if (!chunkResult.first) { errorMsg = "Cannot read chunk"; diff --git a/ixwebsocket/IXHttpClient.h b/ixwebsocket/IXHttpClient.h index 33d9c25b..191893c1 100644 --- a/ixwebsocket/IXHttpClient.h +++ b/ixwebsocket/IXHttpClient.h @@ -61,6 +61,7 @@ namespace ix bool verbose; bool compress; Logger logger; + OnProgressCallback onProgressCallback; }; class HttpClient { diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index bf8b4c9c..3da75068 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -23,11 +23,12 @@ namespace ix { const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; + constexpr size_t Socket::kChunkSize; Socket::Socket(int fd) : _sockfd(fd) { - + _readBuffer.resize(kChunkSize); } Socket::~Socket() @@ -39,7 +40,7 @@ namespace ix { if (_sockfd == -1) { - onPollCallback(PollResultType_Error); + if (onPollCallback) onPollCallback(PollResultType_Error); return; } @@ -70,7 +71,7 @@ namespace ix pollResult = PollResultType_Timeout; } - onPollCallback(pollResult); + if (onPollCallback) onPollCallback(pollResult); } void Socket::wakeUpFromPoll() @@ -215,23 +216,8 @@ namespace ix else if (ret < 0 && (getErrno() == EWOULDBLOCK || getErrno() == EAGAIN)) { - // Wait with a timeout until something is written. - // This way we are not busy looping - fd_set rfds; - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 1 * 1000; // 1ms timeout - - FD_ZERO(&rfds); - FD_SET(_sockfd, &rfds); - - if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 && - (errno == EBADF || errno == EINVAL)) - { - return false; - } - - continue; + // wait with 1 ms timeout + poll(nullptr, 1); } // There was an error during the read, abort else @@ -264,43 +250,37 @@ namespace ix std::pair Socket::readBytes( size_t length, + const OnProgressCallback& onProgressCallback, const CancellationRequest& isCancellationRequested) { - std::string buffer; - buffer.reserve(length); - -#if 1 - while (buffer.size() != length) + std::vector output; + while (output.size() != length) { - ssize_t ret; - std::string buf; - ret = recv((char*)&buf.front(), std::min((size_t) 1024, length)); + if (isCancellationRequested()) return std::make_pair(false, std::string()); + + int size = std::min(kChunkSize, length - output.size()); + ssize_t ret = recv((char*)&_readBuffer[0], size); if (ret <= 0 && (getErrno() != EWOULDBLOCK && getErrno() != EAGAIN)) { - // error case - // Return what we were able to read - return std::make_pair(false, buffer); + // Error + return std::make_pair(false, std::string()); } - else + else if (ret > 0) { - buffer += buf; - } - } -#else - for (size_t i = 0; i < length; ++i) - { - if (!readByte(&c, isCancellationRequested)) - { - // Return what we were able to read - return std::make_pair(false, buffer); + output.insert(output.end(), + _readBuffer.begin(), + _readBuffer.begin() + ret); } - buffer += c; - } -#endif + if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); - return std::make_pair(true, buffer); + // Error + poll(nullptr, 10); + } + + return std::make_pair(true, std::string(output.begin(), + output.end())); } } diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index fbb7e8dd..ed623751 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -10,6 +10,7 @@ #include #include #include +#include #ifdef _WIN32 #include @@ -18,6 +19,7 @@ typedef SSIZE_T ssize_t; #include "IXEventFd.h" #include "IXCancellationRequest.h" +#include "IXProgressCallback.h" namespace ix { @@ -63,6 +65,7 @@ namespace ix const CancellationRequest& isCancellationRequested); std::pair readBytes( size_t length, + const OnProgressCallback& onProgressCallback, const CancellationRequest& isCancellationRequested); static int getErrno(); @@ -79,5 +82,9 @@ namespace ix private: static const int kDefaultPollTimeout; static const int kDefaultPollNoTimeout; + + // Buffer for reading from our socket. That buffer is never resized. + std::vector _readBuffer; + static constexpr size_t kChunkSize = 1 << 15; }; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index e7cfffbf..c27f885d 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -573,7 +573,7 @@ namespace ix // Send message sendFragment(opcodeType, fin, begin, end, compress); - if (onProgressCallback && !onProgressCallback(i, steps)) + if (onProgressCallback && !onProgressCallback((int)i, (int) steps)) { break; } diff --git a/third_party/remove_trailing_whitespaces.sh b/third_party/remote_trailing_whitespaces.sh similarity index 100% rename from third_party/remove_trailing_whitespaces.sh rename to third_party/remote_trailing_whitespaces.sh diff --git a/ws/ws_http_client.cpp b/ws/ws_http_client.cpp index b4a3e512..5ad42a01 100644 --- a/ws/ws_http_client.cpp +++ b/ws/ws_http_client.cpp @@ -107,6 +107,12 @@ namespace ix { std::cout << msg; }; + args.onProgressCallback = [](int current, int total) -> bool + { + std::cerr << "\r" << "Downloaded " + << current << " bytes out of " << total; + return true; + }; HttpParameters httpParameters = parsePostParameters(data); @@ -125,6 +131,8 @@ namespace ix out = httpClient.post(url, httpParameters, args); } + std::cerr << std::endl; + auto statusCode = std::get<0>(out); auto errorCode = std::get<1>(out); auto responseHeaders = std::get<2>(out);