From db7057de69014a397e8641866c43f88816dc0b4d Mon Sep 17 00:00:00 2001 From: Martin Natano Date: Tue, 1 Feb 2022 06:54:32 +0100 Subject: [PATCH] Add support for streaming transfers (#353) This change adds onChunkCallback to the request. If defined it will be called repeatedly with the incoming data. This allows to process data on the go or write it to disk instead of accumulating the data in memory. --- docs/usage.md | 10 +++++ ixwebsocket/IXHttp.cpp | 2 +- ixwebsocket/IXHttp.h | 1 + ixwebsocket/IXHttpClient.cpp | 32 +++++++++------ ixwebsocket/IXProgressCallback.h | 2 + ixwebsocket/IXSocket.cpp | 21 +++++++--- ixwebsocket/IXSocket.h | 1 + test/IXHttpClientTest.cpp | 68 ++++++++++++++++++++++++++++++++ 8 files changed, 120 insertions(+), 17 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index fe63d39f..92557273 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -520,11 +520,21 @@ bool async = true; HttpClient httpClient(async); auto args = httpClient.createRequest(url, HttpClient::kGet); +// If you define a chunk callback it will be called repeteadly with the +// incoming data. This allows to process data on the go or write it to disk +// instead of accumulating the data in memory. +args.onChunkCallback = [](const std::string& data) +{ + // process data +}; + // Push the request to a queue, bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response) { // This callback execute in a background thread. Make sure you uses appropriate protection such as mutex auto statusCode = response->statusCode; // acess results + + // response->body is empty if onChunkCallback was used } ); diff --git a/ixwebsocket/IXHttp.cpp b/ixwebsocket/IXHttp.cpp index 56a466c0..46504026 100644 --- a/ixwebsocket/IXHttp.cpp +++ b/ixwebsocket/IXHttp.cpp @@ -149,7 +149,7 @@ namespace ix false, "Error: 'Content-Length' should be a positive integer", httpRequest); } - auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested); + auto res = socket->readBytes(contentLength, nullptr, nullptr, isCancellationRequested); if (!res.first) { return std::make_tuple( diff --git a/ixwebsocket/IXHttp.h b/ixwebsocket/IXHttp.h index 78293c7f..2cf4f29d 100644 --- a/ixwebsocket/IXHttp.h +++ b/ixwebsocket/IXHttp.h @@ -89,6 +89,7 @@ namespace ix bool compressRequest = false; Logger logger; OnProgressCallback onProgressCallback; + OnChunkCallback onChunkCallback; std::atomic cancel; }; diff --git a/ixwebsocket/IXHttpClient.cpp b/ixwebsocket/IXHttpClient.cpp index bb876db7..00cd9527 100644 --- a/ixwebsocket/IXHttpClient.cpp +++ b/ixwebsocket/IXHttpClient.cpp @@ -176,7 +176,7 @@ namespace ix ss << "Host: " << host << "\r\n"; #ifdef IXWEBSOCKET_USE_ZLIB - if (args->compress) + if (args->compress && !args->onChunkCallback) { ss << "Accept-Encoding: gzip" << "\r\n"; @@ -406,10 +406,10 @@ namespace ix ss << headers["Content-Length"]; ss >> contentLength; - payload.reserve(contentLength); - - auto chunkResult = _socket->readBytes( - contentLength, args->onProgressCallback, isCancellationRequested); + auto chunkResult = _socket->readBytes(contentLength, + args->onProgressCallback, + args->onChunkCallback, + isCancellationRequested); if (!chunkResult.first) { auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError; @@ -423,7 +423,12 @@ namespace ix uploadSize, downloadSize); } - payload += chunkResult.second; + + if (!args->onChunkCallback) + { + payload.reserve(contentLength); + payload += chunkResult.second; + } } else if (headers.find("Transfer-Encoding") != headers.end() && headers["Transfer-Encoding"] == "chunked") @@ -460,11 +465,11 @@ namespace ix log(oss.str(), args); } - payload.reserve(payload.size() + (size_t) chunkSize); - // Read a chunk - auto chunkResult = _socket->readBytes( - (size_t) chunkSize, args->onProgressCallback, isCancellationRequested); + auto chunkResult = _socket->readBytes((size_t) chunkSize, + args->onProgressCallback, + args->onChunkCallback, + isCancellationRequested); if (!chunkResult.first) { auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError; @@ -478,7 +483,12 @@ namespace ix uploadSize, downloadSize); } - payload += chunkResult.second; + + if (!args->onChunkCallback) + { + payload.reserve(payload.size() + (size_t) chunkSize); + payload += chunkResult.second; + } // Read the line that terminates the chunk (\r\n) lineResult = _socket->readLine(isCancellationRequested); diff --git a/ixwebsocket/IXProgressCallback.h b/ixwebsocket/IXProgressCallback.h index 879f6a89..3dfccc99 100644 --- a/ixwebsocket/IXProgressCallback.h +++ b/ixwebsocket/IXProgressCallback.h @@ -7,8 +7,10 @@ #pragma once #include +#include namespace ix { using OnProgressCallback = std::function; + using OnChunkCallback = std::function; } diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index d3469125..dc14bc9d 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -400,12 +400,14 @@ namespace ix std::pair Socket::readBytes( size_t length, const OnProgressCallback& onProgressCallback, + const OnChunkCallback& onChunkCallback, const CancellationRequest& isCancellationRequested) { std::array readBuffer; - std::vector output; - while (output.size() != length) + size_t bytesRead = 0; + + while (bytesRead != length) { if (isCancellationRequested && isCancellationRequested()) { @@ -413,12 +415,21 @@ namespace ix return std::make_pair(false, errorMsg); } - size_t size = std::min(readBuffer.size(), length - output.size()); + size_t size = std::min(readBuffer.size(), length - bytesRead); ssize_t ret = recv((char*) &readBuffer[0], size); if (ret > 0) { - output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret); + if (onChunkCallback) + { + std::string chunk(readBuffer.begin(), readBuffer.begin() + ret); + onChunkCallback(chunk); + } + else + { + output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret); + } + bytesRead += ret; } else if (ret <= 0 && !Socket::isWaitNeeded()) { @@ -426,7 +437,7 @@ namespace ix return std::make_pair(false, errorMsg); } - if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); + if (onProgressCallback) onProgressCallback((int) bytesRead, (int) length); // Wait with a 1ms timeout until the socket is ready to read. // This way we are not busy looping diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index ae5a560e..1f4272d2 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -69,6 +69,7 @@ namespace ix std::pair readLine(const CancellationRequest& isCancellationRequested); std::pair readBytes(size_t length, const OnProgressCallback& onProgressCallback, + const OnChunkCallback& onChunkCallback, const CancellationRequest& isCancellationRequested); static int getErrno(); diff --git a/test/IXHttpClientTest.cpp b/test/IXHttpClientTest.cpp index 722f2f8c..2f9899dd 100644 --- a/test/IXHttpClientTest.cpp +++ b/test/IXHttpClientTest.cpp @@ -5,6 +5,7 @@ */ #include "catch.hpp" +#include #include #include @@ -274,4 +275,71 @@ TEST_CASE("http_client", "[http]") std::cerr << "Done" << std::endl; REQUIRE(errorCode == HttpErrorCode::Cancelled); } + + SECTION("Async API, streaming transfer") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + SocketTLSOptions tlsOptions; + tlsOptions.caFile = "cacert.pem"; + httpClient.setTLSOptions(tlsOptions); + + std::string url("http://speedtest.belwue.net/random-100M"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 120; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = false; + args->logger = [](const std::string& msg) { std::cout << msg; }; + args->onProgressCallback = [](int current, int total) -> bool { + std::cerr << "\r" + << "Downloaded " << current << " bytes out of " << total; + return true; + }; + + // compute Adler-32 checksum of received data + uint32_t a = 1, b = 0; + args->onChunkCallback = [&](const std::string& data) { + for (const char c: data) + { + a = (a + (unsigned char)c) % 65521; + b = (b + a) % 65521; + } + }; + + std::atomic requestCompleted(false); + std::atomic errorCode(HttpErrorCode::Invalid); + std::atomic statusCode(0); + + httpClient.performRequest( + args, [&](const HttpResponsePtr& response) { + errorCode = response->errorCode; + statusCode = response->statusCode; + requestCompleted = true; + }); + + int wait = 0; + while (wait < 120000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + std::cerr << "Done" << std::endl; + REQUIRE(errorCode == HttpErrorCode::Ok); + REQUIRE(statusCode == 200); + + // compare checksum with a known good value + uint32_t checksum = (b << 16) | a; + REQUIRE(checksum == 1440194471); + } }