Feature/http async (#90)

* unittest working / uses shared_ptr for a bunch of things 🗿

* fix command line tools

* fix ws + add doc

* add more logging
This commit is contained in:
Benjamin Sergeant
2019-06-05 17:04:24 -07:00
committed by GitHub
parent 012193c74e
commit 78b3d7ff2d
14 changed files with 490 additions and 223 deletions

View File

@ -24,21 +24,89 @@ namespace ix
const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT";
HttpClient::HttpClient()
HttpClient::HttpClient(bool async) : _async(async), _stop(false)
{
if (!_async) return;
_thread = std::thread(&HttpClient::run, this);
}
HttpClient::~HttpClient()
{
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
}
HttpResponse HttpClient::request(
HttpRequestArgsPtr HttpClient::createRequest(const std::string& url,
const std::string& verb)
{
auto request = std::make_shared<HttpRequestArgs>();
request->url = url;
request->verb = verb;
return request;
}
bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback)
{
if (!_async) return false;
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queueMutex);
// add the task
_queue.push(std::make_pair(args, onResponseCallback));
} // release lock
// wake up one thread
_condition.notify_one();
return true;
}
void HttpClient::run()
{
while (true)
{
HttpRequestArgsPtr args;
OnResponseCallback onResponseCallback;
{
std::unique_lock<std::mutex> lock(_queueMutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop) return;
auto p = _queue.front();
_queue.pop();
args = p.first;
onResponseCallback = p.second;
}
if (_stop) return;
HttpResponsePtr response = request(args->url, args->verb, args->body, args);
onResponseCallback(response);
if (_stop) return;
}
}
HttpResponsePtr HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
HttpRequestArgsPtr args,
int redirects)
{
uint64_t uploadSize = 0;
@ -54,7 +122,7 @@ namespace ix
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return HttpResponse(code, HttpErrorCode::UrlMalformed,
return std::make_shared<HttpResponse>(code, HttpErrorCode::UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
@ -65,7 +133,7 @@ namespace ix
if (!_socket)
{
return HttpResponse(code, HttpErrorCode::CannotCreateSocket,
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
@ -75,13 +143,13 @@ namespace ix
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
if (args.compress)
if (args->compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
for (auto&& it : args->extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
@ -103,7 +171,7 @@ namespace ix
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
if (args->extraHeaders.find("Content-Type") == args->extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
@ -121,23 +189,23 @@ namespace ix
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
makeCancellationRequestWithTimeout(args->connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url << " / error : " << errMsg;
return HttpResponse(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
makeCancellationRequestWithTimeout(args->transferTimeout, requestInitCancellation);
if (args.verbose)
if (args->verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
@ -154,9 +222,9 @@ namespace ix
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return HttpResponse(code, HttpErrorCode::SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
@ -168,12 +236,12 @@ namespace ix
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return HttpResponse(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
if (args->verbose)
{
std::stringstream ss;
ss << "Status line " << line;
@ -183,9 +251,9 @@ namespace ix
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return HttpResponse(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
@ -195,29 +263,29 @@ namespace ix
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return HttpResponse(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
if ((code >= 301 && code <= 308) && args->followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return HttpResponse(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
if (redirects >= args->maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return HttpResponse(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
@ -227,9 +295,9 @@ namespace ix
if (verb == "HEAD")
{
return HttpResponse(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
@ -243,14 +311,14 @@ namespace ix
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
args->onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
@ -266,9 +334,9 @@ namespace ix
if (!lineResult.first)
{
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
@ -276,7 +344,7 @@ namespace ix
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
if (args->verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
@ -288,14 +356,14 @@ namespace ix
// Read a chunk
auto chunkResult = _socket->readBytes((size_t) chunkSize,
args.onProgressCallback,
args->onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
@ -304,9 +372,9 @@ namespace ix
if (!lineResult.first)
{
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
@ -319,9 +387,9 @@ namespace ix
else
{
std::string errorMsg("Cannot read http body");
return HttpResponse(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
@ -333,60 +401,60 @@ namespace ix
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return HttpResponse(code, HttpErrorCode::Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return HttpResponse(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::get(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::head(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::del(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::del(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kDel, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::post(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args)
{
return request(url, kPost, body, args);
}
HttpResponse HttpClient::put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::put(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPut, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::put(const std::string& url,
const std::string& body,
const HttpRequestArgsPtr args)
{
return request(url, kPut, body, args);
}
@ -488,11 +556,11 @@ namespace ix
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
HttpRequestArgsPtr args)
{
if (args.logger)
if (args->logger)
{
args.logger(msg);
args->logger(msg);
}
}
}