diff --git a/docs/usage.md b/docs/usage.md index fe63d39f..92557273 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -520,11 +520,21 @@ bool async = true; HttpClient httpClient(async); auto args = httpClient.createRequest(url, HttpClient::kGet); +// If you define a chunk callback it will be called repeteadly with the +// incoming data. This allows to process data on the go or write it to disk +// instead of accumulating the data in memory. +args.onChunkCallback = [](const std::string& data) +{ + // process data +}; + // Push the request to a queue, bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response) { // This callback execute in a background thread. Make sure you uses appropriate protection such as mutex auto statusCode = response->statusCode; // acess results + + // response->body is empty if onChunkCallback was used } ); diff --git a/ixwebsocket/IXHttp.cpp b/ixwebsocket/IXHttp.cpp index 56a466c0..46504026 100644 --- a/ixwebsocket/IXHttp.cpp +++ b/ixwebsocket/IXHttp.cpp @@ -149,7 +149,7 @@ namespace ix false, "Error: 'Content-Length' should be a positive integer", httpRequest); } - auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested); + auto res = socket->readBytes(contentLength, nullptr, nullptr, isCancellationRequested); if (!res.first) { return std::make_tuple( diff --git a/ixwebsocket/IXHttp.h b/ixwebsocket/IXHttp.h index 78293c7f..2cf4f29d 100644 --- a/ixwebsocket/IXHttp.h +++ b/ixwebsocket/IXHttp.h @@ -89,6 +89,7 @@ namespace ix bool compressRequest = false; Logger logger; OnProgressCallback onProgressCallback; + OnChunkCallback onChunkCallback; std::atomic cancel; }; diff --git a/ixwebsocket/IXHttpClient.cpp b/ixwebsocket/IXHttpClient.cpp index bb876db7..00cd9527 100644 --- a/ixwebsocket/IXHttpClient.cpp +++ b/ixwebsocket/IXHttpClient.cpp @@ -176,7 +176,7 @@ namespace ix ss << "Host: " << host << "\r\n"; #ifdef IXWEBSOCKET_USE_ZLIB - if (args->compress) + if (args->compress && !args->onChunkCallback) { ss << "Accept-Encoding: gzip" << "\r\n"; @@ -406,10 +406,10 @@ namespace ix ss << headers["Content-Length"]; ss >> contentLength; - payload.reserve(contentLength); - - auto chunkResult = _socket->readBytes( - contentLength, args->onProgressCallback, isCancellationRequested); + auto chunkResult = _socket->readBytes(contentLength, + args->onProgressCallback, + args->onChunkCallback, + isCancellationRequested); if (!chunkResult.first) { auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError; @@ -423,7 +423,12 @@ namespace ix uploadSize, downloadSize); } - payload += chunkResult.second; + + if (!args->onChunkCallback) + { + payload.reserve(contentLength); + payload += chunkResult.second; + } } else if (headers.find("Transfer-Encoding") != headers.end() && headers["Transfer-Encoding"] == "chunked") @@ -460,11 +465,11 @@ namespace ix log(oss.str(), args); } - payload.reserve(payload.size() + (size_t) chunkSize); - // Read a chunk - auto chunkResult = _socket->readBytes( - (size_t) chunkSize, args->onProgressCallback, isCancellationRequested); + auto chunkResult = _socket->readBytes((size_t) chunkSize, + args->onProgressCallback, + args->onChunkCallback, + isCancellationRequested); if (!chunkResult.first) { auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError; @@ -478,7 +483,12 @@ namespace ix uploadSize, downloadSize); } - payload += chunkResult.second; + + if (!args->onChunkCallback) + { + payload.reserve(payload.size() + (size_t) chunkSize); + payload += chunkResult.second; + } // Read the line that terminates the chunk (\r\n) lineResult = _socket->readLine(isCancellationRequested); diff --git a/ixwebsocket/IXProgressCallback.h b/ixwebsocket/IXProgressCallback.h index 879f6a89..3dfccc99 100644 --- a/ixwebsocket/IXProgressCallback.h +++ b/ixwebsocket/IXProgressCallback.h @@ -7,8 +7,10 @@ #pragma once #include +#include namespace ix { using OnProgressCallback = std::function; + using OnChunkCallback = std::function; } diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index d3469125..dc14bc9d 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -400,12 +400,14 @@ namespace ix std::pair Socket::readBytes( size_t length, const OnProgressCallback& onProgressCallback, + const OnChunkCallback& onChunkCallback, const CancellationRequest& isCancellationRequested) { std::array readBuffer; - std::vector output; - while (output.size() != length) + size_t bytesRead = 0; + + while (bytesRead != length) { if (isCancellationRequested && isCancellationRequested()) { @@ -413,12 +415,21 @@ namespace ix return std::make_pair(false, errorMsg); } - size_t size = std::min(readBuffer.size(), length - output.size()); + size_t size = std::min(readBuffer.size(), length - bytesRead); ssize_t ret = recv((char*) &readBuffer[0], size); if (ret > 0) { - output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret); + if (onChunkCallback) + { + std::string chunk(readBuffer.begin(), readBuffer.begin() + ret); + onChunkCallback(chunk); + } + else + { + output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret); + } + bytesRead += ret; } else if (ret <= 0 && !Socket::isWaitNeeded()) { @@ -426,7 +437,7 @@ namespace ix return std::make_pair(false, errorMsg); } - if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); + if (onProgressCallback) onProgressCallback((int) bytesRead, (int) length); // Wait with a 1ms timeout until the socket is ready to read. // This way we are not busy looping diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index ae5a560e..1f4272d2 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -69,6 +69,7 @@ namespace ix std::pair readLine(const CancellationRequest& isCancellationRequested); std::pair readBytes(size_t length, const OnProgressCallback& onProgressCallback, + const OnChunkCallback& onChunkCallback, const CancellationRequest& isCancellationRequested); static int getErrno(); diff --git a/test/IXHttpClientTest.cpp b/test/IXHttpClientTest.cpp index 722f2f8c..2f9899dd 100644 --- a/test/IXHttpClientTest.cpp +++ b/test/IXHttpClientTest.cpp @@ -5,6 +5,7 @@ */ #include "catch.hpp" +#include #include #include @@ -274,4 +275,71 @@ TEST_CASE("http_client", "[http]") std::cerr << "Done" << std::endl; REQUIRE(errorCode == HttpErrorCode::Cancelled); } + + SECTION("Async API, streaming transfer") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + SocketTLSOptions tlsOptions; + tlsOptions.caFile = "cacert.pem"; + httpClient.setTLSOptions(tlsOptions); + + std::string url("http://speedtest.belwue.net/random-100M"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 120; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = false; + args->logger = [](const std::string& msg) { std::cout << msg; }; + args->onProgressCallback = [](int current, int total) -> bool { + std::cerr << "\r" + << "Downloaded " << current << " bytes out of " << total; + return true; + }; + + // compute Adler-32 checksum of received data + uint32_t a = 1, b = 0; + args->onChunkCallback = [&](const std::string& data) { + for (const char c: data) + { + a = (a + (unsigned char)c) % 65521; + b = (b + a) % 65521; + } + }; + + std::atomic requestCompleted(false); + std::atomic errorCode(HttpErrorCode::Invalid); + std::atomic statusCode(0); + + httpClient.performRequest( + args, [&](const HttpResponsePtr& response) { + errorCode = response->errorCode; + statusCode = response->statusCode; + requestCompleted = true; + }); + + int wait = 0; + while (wait < 120000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + std::cerr << "Done" << std::endl; + REQUIRE(errorCode == HttpErrorCode::Ok); + REQUIRE(statusCode == 200); + + // compare checksum with a known good value + uint32_t checksum = (b << 16) | a; + REQUIRE(checksum == 1440194471); + } }