Add support for streaming transfers (#353)

This change adds onChunkCallback to the request. If defined it will be
called repeatedly with the incoming data. This allows to process data on
the go or write it to disk instead of accumulating the data in memory.
This commit is contained in:
Martin Natano 2022-02-01 06:54:32 +01:00 committed by GitHub
parent c28b569535
commit db7057de69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 120 additions and 17 deletions

View File

@ -520,11 +520,21 @@ bool async = true;
HttpClient httpClient(async);
auto args = httpClient.createRequest(url, HttpClient::kGet);
// If you define a chunk callback it will be called repeteadly with the
// incoming data. This allows to process data on the go or write it to disk
// instead of accumulating the data in memory.
args.onChunkCallback = [](const std::string& data)
{
// process data
};
// Push the request to a queue,
bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response)
{
// This callback execute in a background thread. Make sure you uses appropriate protection such as mutex
auto statusCode = response->statusCode; // acess results
// response->body is empty if onChunkCallback was used
}
);

View File

@ -149,7 +149,7 @@ namespace ix
false, "Error: 'Content-Length' should be a positive integer", httpRequest);
}
auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested);
auto res = socket->readBytes(contentLength, nullptr, nullptr, isCancellationRequested);
if (!res.first)
{
return std::make_tuple(

View File

@ -89,6 +89,7 @@ namespace ix
bool compressRequest = false;
Logger logger;
OnProgressCallback onProgressCallback;
OnChunkCallback onChunkCallback;
std::atomic<bool> cancel;
};

View File

@ -176,7 +176,7 @@ namespace ix
ss << "Host: " << host << "\r\n";
#ifdef IXWEBSOCKET_USE_ZLIB
if (args->compress)
if (args->compress && !args->onChunkCallback)
{
ss << "Accept-Encoding: gzip"
<< "\r\n";
@ -406,10 +406,10 @@ namespace ix
ss << headers["Content-Length"];
ss >> contentLength;
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(
contentLength, args->onProgressCallback, isCancellationRequested);
auto chunkResult = _socket->readBytes(contentLength,
args->onProgressCallback,
args->onChunkCallback,
isCancellationRequested);
if (!chunkResult.first)
{
auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError;
@ -423,8 +423,13 @@ namespace ix
uploadSize,
downloadSize);
}
if (!args->onChunkCallback)
{
payload.reserve(contentLength);
payload += chunkResult.second;
}
}
else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked")
{
@ -460,11 +465,11 @@ namespace ix
log(oss.str(), args);
}
payload.reserve(payload.size() + (size_t) chunkSize);
// Read a chunk
auto chunkResult = _socket->readBytes(
(size_t) chunkSize, args->onProgressCallback, isCancellationRequested);
auto chunkResult = _socket->readBytes((size_t) chunkSize,
args->onProgressCallback,
args->onChunkCallback,
isCancellationRequested);
if (!chunkResult.first)
{
auto errorCode = args->cancel ? HttpErrorCode::Cancelled : HttpErrorCode::ChunkReadError;
@ -478,7 +483,12 @@ namespace ix
uploadSize,
downloadSize);
}
if (!args->onChunkCallback)
{
payload.reserve(payload.size() + (size_t) chunkSize);
payload += chunkResult.second;
}
// Read the line that terminates the chunk (\r\n)
lineResult = _socket->readLine(isCancellationRequested);

View File

@ -7,8 +7,10 @@
#pragma once
#include <functional>
#include <string>
namespace ix
{
using OnProgressCallback = std::function<bool(int current, int total)>;
using OnChunkCallback = std::function<void(const std::string&)>;
}

View File

@ -400,12 +400,14 @@ namespace ix
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const OnChunkCallback& onChunkCallback,
const CancellationRequest& isCancellationRequested)
{
std::array<uint8_t, 1 << 14> readBuffer;
std::vector<uint8_t> output;
while (output.size() != length)
size_t bytesRead = 0;
while (bytesRead != length)
{
if (isCancellationRequested && isCancellationRequested())
{
@ -413,20 +415,29 @@ namespace ix
return std::make_pair(false, errorMsg);
}
size_t size = std::min(readBuffer.size(), length - output.size());
size_t size = std::min(readBuffer.size(), length - bytesRead);
ssize_t ret = recv((char*) &readBuffer[0], size);
if (ret > 0)
{
if (onChunkCallback)
{
std::string chunk(readBuffer.begin(), readBuffer.begin() + ret);
onChunkCallback(chunk);
}
else
{
output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret);
}
bytesRead += ret;
}
else if (ret <= 0 && !Socket::isWaitNeeded())
{
const std::string errorMsg("Recv Error");
return std::make_pair(false, errorMsg);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
if (onProgressCallback) onProgressCallback((int) bytesRead, (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping

View File

@ -69,6 +69,7 @@ namespace ix
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(size_t length,
const OnProgressCallback& onProgressCallback,
const OnChunkCallback& onChunkCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno();

View File

@ -5,6 +5,7 @@
*/
#include "catch.hpp"
#include <cstdint>
#include <iostream>
#include <ixwebsocket/IXHttpClient.h>
@ -274,4 +275,71 @@ TEST_CASE("http_client", "[http]")
std::cerr << "Done" << std::endl;
REQUIRE(errorCode == HttpErrorCode::Cancelled);
}
SECTION("Async API, streaming transfer")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
SocketTLSOptions tlsOptions;
tlsOptions.caFile = "cacert.pem";
httpClient.setTLSOptions(tlsOptions);
std::string url("http://speedtest.belwue.net/random-100M");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 120;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = false;
args->logger = [](const std::string& msg) { std::cout << msg; };
args->onProgressCallback = [](int current, int total) -> bool {
std::cerr << "\r"
<< "Downloaded " << current << " bytes out of " << total;
return true;
};
// compute Adler-32 checksum of received data
uint32_t a = 1, b = 0;
args->onChunkCallback = [&](const std::string& data) {
for (const char c: data)
{
a = (a + (unsigned char)c) % 65521;
b = (b + a) % 65521;
}
};
std::atomic<bool> requestCompleted(false);
std::atomic<HttpErrorCode> errorCode(HttpErrorCode::Invalid);
std::atomic<int> statusCode(0);
httpClient.performRequest(
args, [&](const HttpResponsePtr& response) {
errorCode = response->errorCode;
statusCode = response->statusCode;
requestCompleted = true;
});
int wait = 0;
while (wait < 120000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(errorCode == HttpErrorCode::Ok);
REQUIRE(statusCode == 200);
// compare checksum with a known good value
uint32_t checksum = (b << 16) | a;
REQUIRE(checksum == 1440194471);
}
}