From 78b3d7ff2d63b27e3391502499ca8ecbb6f62162 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 5 Jun 2019 17:04:24 -0700 Subject: [PATCH] Feature/http async (#90) * unittest working / uses shared_ptr for a bunch of things :moyai: * fix command line tools * fix ws + add doc * add more logging --- README.md | 53 ++-- docker/Dockerfile.ubuntu_bionic | 23 ++ ixwebsocket/IXHttpClient.cpp | 234 +++++++++++------- ixwebsocket/IXHttpClient.h | 88 ++++--- ixwebsocket/IXSocketMbedTLS.cpp | 2 +- ixwebsocket/IXWebSocketMessageQueue.cpp | 2 +- ixwebsocket/IXWebSocketTransport.cpp | 2 +- test/IXHttpClientTest.cpp | 218 ++++++++++++---- test/IXWebSocketCloseTest.cpp | 2 +- ws/IXSentryClient.cpp | 34 +-- .../IXCobraMetricsThreadedPublisher.cpp | 2 +- ws/ixcobra/IXCobraMetricsThreadedPublisher.h | 1 - ws/ws_cobra_to_sentry.cpp | 8 +- ws/ws_http_client.cpp | 44 ++-- 14 files changed, 490 insertions(+), 223 deletions(-) create mode 100644 docker/Dockerfile.ubuntu_bionic diff --git a/README.md b/README.md index ca88e0f9..f6b2d8b0 100644 --- a/README.md +++ b/README.md @@ -129,33 +129,33 @@ Here is what the HTTP client API looks like. Note that HTTP client support is ve // Preparation // HttpClient httpClient; -HttpRequestArgs args; +HttpRequestArgsPtr args = httpClient.createRequest(); // Custom headers can be set WebSocketHttpHeaders headers; headers["Foo"] = "bar"; -args.extraHeaders = headers; +args->extraHeaders = headers; // Timeout options -args.connectTimeout = connectTimeout; -args.transferTimeout = transferTimeout; +args->connectTimeout = connectTimeout; +args->transferTimeout = transferTimeout; // Redirect options -args.followRedirects = followRedirects; -args.maxRedirects = maxRedirects; +args->followRedirects = followRedirects; +args->maxRedirects = maxRedirects; // Misc -args.compress = compress; // Enable gzip compression -args.verbose = verbose; -args.logger = [](const std::string& msg) +args->compress = compress; // Enable gzip compression +args->verbose = verbose; +args->logger = [](const std::string& msg) { std::cout << msg; }; // -// Request +// Synchronous Request // -HttpResponse out; +HttpResponsePtr out; std::string url = "https://www.google.com"; // HEAD request @@ -175,13 +175,30 @@ out = httpClient.post(url, std::string("foo=bar"), args); // // Result // -auto statusCode = std::get<0>(out); -auto errorCode = std::get<1>(out); -auto responseHeaders = std::get<2>(out); -auto payload = std::get<3>(out); -auto errorMsg = std::get<4>(out); -auto uploadSize = std::get<5>(out); -auto downloadSize = std::get<6>(out); +auto errorCode = response->errorCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc... +auto errorCode = response->errorCode; // 200, 404, etc... +auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string) +auto payload = response->payload; // All the bytes from the response as an std::string +auto errorMsg = response->errorMsg; // Descriptive error message in case of failure +auto uploadSize = response->uploadSize; // Byte count of uploaded data +auto downloadSize = response->downloadSize; // Byte count of downloaded data + +// +// Asynchronous Request +// +bool async = true; +HttpClient httpClient(async); +auto args = httpClient.createRequest(url, HttpClient::kGet); + +// Push the request to a queue, +bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response) + { + // This callback execute in a background thread. Make sure you uses appropriate protection such as mutex + auto statusCode = response->statusCode; // acess results + } +); + +// ok will be false if your httpClient is not async ``` ## Build diff --git a/docker/Dockerfile.ubuntu_bionic b/docker/Dockerfile.ubuntu_bionic new file mode 100644 index 00000000..88892cff --- /dev/null +++ b/docker/Dockerfile.ubuntu_bionic @@ -0,0 +1,23 @@ +# Build time +FROM ubuntu:bionic as build + +ENV DEBIAN_FRONTEND noninteractive +RUN apt-get update +RUN apt-get -y install wget +RUN mkdir -p /tmp/cmake +WORKDIR /tmp/cmake +RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz +RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz + +RUN apt-get -y install g++ +RUN apt-get -y install libssl-dev +RUN apt-get -y install libz-dev +RUN apt-get -y install make +RUN apt-get -y install python + +COPY . . + +ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin +ENV PATH="${CMAKE_BIN_PATH}:${PATH}" + +RUN ["make", "ws"] diff --git a/ixwebsocket/IXHttpClient.cpp b/ixwebsocket/IXHttpClient.cpp index cda2dd2a..d39ac253 100644 --- a/ixwebsocket/IXHttpClient.cpp +++ b/ixwebsocket/IXHttpClient.cpp @@ -24,21 +24,89 @@ namespace ix const std::string HttpClient::kDel = "DEL"; const std::string HttpClient::kPut = "PUT"; - HttpClient::HttpClient() + HttpClient::HttpClient(bool async) : _async(async), _stop(false) { + if (!_async) return; + _thread = std::thread(&HttpClient::run, this); } HttpClient::~HttpClient() { + if (!_thread.joinable()) return; + _stop = true; + _condition.notify_one(); + _thread.join(); } - HttpResponse HttpClient::request( + HttpRequestArgsPtr HttpClient::createRequest(const std::string& url, + const std::string& verb) + { + auto request = std::make_shared(); + request->url = url; + request->verb = verb; + return request; + } + + bool HttpClient::performRequest(HttpRequestArgsPtr args, + const OnResponseCallback& onResponseCallback) + { + if (!_async) return false; + + // Enqueue the task + { + // acquire lock + std::unique_lock lock(_queueMutex); + + // add the task + _queue.push(std::make_pair(args, onResponseCallback)); + } // release lock + + // wake up one thread + _condition.notify_one(); + + return true; + } + + void HttpClient::run() + { + while (true) + { + HttpRequestArgsPtr args; + OnResponseCallback onResponseCallback; + + { + std::unique_lock lock(_queueMutex); + + while (!_stop && _queue.empty()) + { + _condition.wait(lock); + } + + if (_stop) return; + + auto p = _queue.front(); + _queue.pop(); + + args = p.first; + onResponseCallback = p.second; + } + + if (_stop) return; + + HttpResponsePtr response = request(args->url, args->verb, args->body, args); + onResponseCallback(response); + + if (_stop) return; + } + } + + HttpResponsePtr HttpClient::request( const std::string& url, const std::string& verb, const std::string& body, - const HttpRequestArgs& args, + HttpRequestArgsPtr args, int redirects) { uint64_t uploadSize = 0; @@ -54,7 +122,7 @@ namespace ix { std::stringstream ss; ss << "Cannot parse url: " << url; - return HttpResponse(code, HttpErrorCode::UrlMalformed, + return std::make_shared(code, HttpErrorCode::UrlMalformed, headers, payload, ss.str(), uploadSize, downloadSize); } @@ -65,7 +133,7 @@ namespace ix if (!_socket) { - return HttpResponse(code, HttpErrorCode::CannotCreateSocket, + return std::make_shared(code, HttpErrorCode::CannotCreateSocket, headers, payload, errorMsg, uploadSize, downloadSize); } @@ -75,13 +143,13 @@ namespace ix ss << verb << " " << path << " HTTP/1.1\r\n"; ss << "Host: " << host << "\r\n"; - if (args.compress) + if (args->compress) { ss << "Accept-Encoding: gzip" << "\r\n"; } // Append extra headers - for (auto&& it : args.extraHeaders) + for (auto&& it : args->extraHeaders) { ss << it.first << ": " << it.second << "\r\n"; } @@ -103,7 +171,7 @@ namespace ix ss << "Content-Length: " << body.size() << "\r\n"; // Set default Content-Type if unspecified - if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end()) + if (args->extraHeaders.find("Content-Type") == args->extraHeaders.end()) { ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n"; } @@ -121,23 +189,23 @@ namespace ix // Make a cancellation object dealing with connection timeout auto isCancellationRequested = - makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation); + makeCancellationRequestWithTimeout(args->connectTimeout, requestInitCancellation); bool success = _socket->connect(host, port, errMsg, isCancellationRequested); if (!success) { std::stringstream ss; ss << "Cannot connect to url: " << url << " / error : " << errMsg; - return HttpResponse(code, HttpErrorCode::CannotConnect, - headers, payload, ss.str(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotConnect, + headers, payload, ss.str(), + uploadSize, downloadSize); } // Make a new cancellation object dealing with transfer timeout isCancellationRequested = - makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation); + makeCancellationRequestWithTimeout(args->transferTimeout, requestInitCancellation); - if (args.verbose) + if (args->verbose) { std::stringstream ss; ss << "Sending " << verb << " request " @@ -154,9 +222,9 @@ namespace ix if (!_socket->writeBytes(req, isCancellationRequested)) { std::string errorMsg("Cannot send request"); - return HttpResponse(code, HttpErrorCode::SendError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::SendError, + headers, payload, errorMsg, + uploadSize, downloadSize); } uploadSize = req.size(); @@ -168,12 +236,12 @@ namespace ix if (!lineValid) { std::string errorMsg("Cannot retrieve status line"); - return HttpResponse(code, HttpErrorCode::CannotReadStatusLine, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotReadStatusLine, + headers, payload, errorMsg, + uploadSize, downloadSize); } - if (args.verbose) + if (args->verbose) { std::stringstream ss; ss << "Status line " << line; @@ -183,9 +251,9 @@ namespace ix if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1) { std::string errorMsg("Cannot parse response code from status line"); - return HttpResponse(code, HttpErrorCode::MissingStatus, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::MissingStatus, + headers, payload, errorMsg, + uploadSize, downloadSize); } auto result = parseHttpHeaders(_socket, isCancellationRequested); @@ -195,29 +263,29 @@ namespace ix if (!headersValid) { std::string errorMsg("Cannot parse http headers"); - return HttpResponse(code, HttpErrorCode::HeaderParsingError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::HeaderParsingError, + headers, payload, errorMsg, + uploadSize, downloadSize); } // Redirect ? - if ((code >= 301 && code <= 308) && args.followRedirects) + if ((code >= 301 && code <= 308) && args->followRedirects) { if (headers.find("Location") == headers.end()) { std::string errorMsg("Missing location header for redirect"); - return HttpResponse(code, HttpErrorCode::MissingLocation, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::MissingLocation, + headers, payload, errorMsg, + uploadSize, downloadSize); } - if (redirects >= args.maxRedirects) + if (redirects >= args->maxRedirects) { std::stringstream ss; ss << "Too many redirects: " << redirects; - return HttpResponse(code, HttpErrorCode::TooManyRedirects, - headers, payload, ss.str(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::TooManyRedirects, + headers, payload, ss.str(), + uploadSize, downloadSize); } // Recurse @@ -227,9 +295,9 @@ namespace ix if (verb == "HEAD") { - return HttpResponse(code, HttpErrorCode::Ok, - headers, payload, std::string(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Ok, + headers, payload, std::string(), + uploadSize, downloadSize); } // Parse response: @@ -243,14 +311,14 @@ namespace ix payload.reserve(contentLength); auto chunkResult = _socket->readBytes(contentLength, - args.onProgressCallback, + args->onProgressCallback, isCancellationRequested); if (!chunkResult.first) { errorMsg = "Cannot read chunk"; - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload += chunkResult.second; } @@ -266,9 +334,9 @@ namespace ix if (!lineResult.first) { - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } uint64_t chunkSize; @@ -276,7 +344,7 @@ namespace ix ss << std::hex << line; ss >> chunkSize; - if (args.verbose) + if (args->verbose) { std::stringstream oss; oss << "Reading " << chunkSize << " bytes" @@ -288,14 +356,14 @@ namespace ix // Read a chunk auto chunkResult = _socket->readBytes((size_t) chunkSize, - args.onProgressCallback, + args->onProgressCallback, isCancellationRequested); if (!chunkResult.first) { errorMsg = "Cannot read chunk"; - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload += chunkResult.second; @@ -304,9 +372,9 @@ namespace ix if (!lineResult.first) { - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } if (chunkSize == 0) break; @@ -319,9 +387,9 @@ namespace ix else { std::string errorMsg("Cannot read http body"); - return HttpResponse(code, HttpErrorCode::CannotReadBody, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotReadBody, + headers, payload, errorMsg, + uploadSize, downloadSize); } downloadSize = payload.size(); @@ -333,60 +401,60 @@ namespace ix if (!gzipInflate(payload, decompressedPayload)) { std::string errorMsg("Error decompressing payload"); - return HttpResponse(code, HttpErrorCode::Gzip, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Gzip, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload = decompressedPayload; } - return HttpResponse(code, HttpErrorCode::Ok, - headers, payload, std::string(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Ok, + headers, payload, std::string(), + uploadSize, downloadSize); } - HttpResponse HttpClient::get(const std::string& url, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::get(const std::string& url, + HttpRequestArgsPtr args) { return request(url, kGet, std::string(), args); } - HttpResponse HttpClient::head(const std::string& url, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::head(const std::string& url, + HttpRequestArgsPtr args) { return request(url, kHead, std::string(), args); } - HttpResponse HttpClient::del(const std::string& url, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::del(const std::string& url, + HttpRequestArgsPtr args) { return request(url, kDel, std::string(), args); } - HttpResponse HttpClient::post(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::post(const std::string& url, + const HttpParameters& httpParameters, + HttpRequestArgsPtr args) { return request(url, kPost, serializeHttpParameters(httpParameters), args); } - HttpResponse HttpClient::post(const std::string& url, - const std::string& body, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::post(const std::string& url, + const std::string& body, + HttpRequestArgsPtr args) { return request(url, kPost, body, args); } - HttpResponse HttpClient::put(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::put(const std::string& url, + const HttpParameters& httpParameters, + HttpRequestArgsPtr args) { return request(url, kPut, serializeHttpParameters(httpParameters), args); } - HttpResponse HttpClient::put(const std::string& url, - const std::string& body, - const HttpRequestArgs& args) + HttpResponsePtr HttpClient::put(const std::string& url, + const std::string& body, + const HttpRequestArgsPtr args) { return request(url, kPut, body, args); } @@ -488,11 +556,11 @@ namespace ix } void HttpClient::log(const std::string& msg, - const HttpRequestArgs& args) + HttpRequestArgsPtr args) { - if (args.logger) + if (args->logger) { - args.logger(msg); + args->logger(msg); } } } diff --git a/ixwebsocket/IXHttpClient.h b/ixwebsocket/IXHttpClient.h index ccaed27a..7a0e7273 100644 --- a/ixwebsocket/IXHttpClient.h +++ b/ixwebsocket/IXHttpClient.h @@ -10,11 +10,13 @@ #include "IXWebSocketHttpHeaders.h" #include #include +#include #include #include #include #include -#include +#include +#include namespace ix { @@ -34,7 +36,8 @@ namespace ix MissingLocation = 11, TooManyRedirects = 12, ChunkReadError = 13, - CannotReadBody = 14 + CannotReadBody = 14, + Invalid = 100 }; struct HttpResponse @@ -66,12 +69,15 @@ namespace ix } }; + using HttpResponsePtr = std::shared_ptr; using HttpParameters = std::map; using Logger = std::function; + using OnResponseCallback = std::function; struct HttpRequestArgs { std::string url; + std::string verb; WebSocketHttpHeaders extraHeaders; std::string body; int connectTimeout; @@ -84,51 +90,71 @@ namespace ix OnProgressCallback onProgressCallback; }; + using HttpRequestArgsPtr = std::shared_ptr; + class HttpClient { public: - HttpClient(); + HttpClient(bool async = false); ~HttpClient(); - HttpResponse get(const std::string& url, const HttpRequestArgs& args); - HttpResponse head(const std::string& url, const HttpRequestArgs& args); - HttpResponse del(const std::string& url, const HttpRequestArgs& args); + HttpResponsePtr get(const std::string& url, HttpRequestArgsPtr args); + HttpResponsePtr head(const std::string& url, HttpRequestArgsPtr args); + HttpResponsePtr del(const std::string& url, HttpRequestArgsPtr args); - HttpResponse post(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args); - HttpResponse post(const std::string& url, - const std::string& body, - const HttpRequestArgs& args); - - HttpResponse put(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args); - HttpResponse put(const std::string& url, - const std::string& body, - const HttpRequestArgs& args); - - HttpResponse request(const std::string& url, - const std::string& verb, + HttpResponsePtr post(const std::string& url, + const HttpParameters& httpParameters, + HttpRequestArgsPtr args); + HttpResponsePtr post(const std::string& url, const std::string& body, - const HttpRequestArgs& args, - int redirects = 0); + HttpRequestArgsPtr args); + + HttpResponsePtr put(const std::string& url, + const HttpParameters& httpParameters, + HttpRequestArgsPtr args); + HttpResponsePtr put(const std::string& url, + const std::string& body, + HttpRequestArgsPtr args); + + HttpResponsePtr request(const std::string& url, + const std::string& verb, + const std::string& body, + HttpRequestArgsPtr args, + int redirects = 0); + + // Async API + HttpRequestArgsPtr createRequest(const std::string& url = std::string(), + const std::string& verb = HttpClient::kGet); + + bool performRequest(HttpRequestArgsPtr request, + const OnResponseCallback& onResponseCallback); std::string serializeHttpParameters(const HttpParameters& httpParameters); std::string urlEncode(const std::string& value); - private: - void log(const std::string& msg, const HttpRequestArgs& args); - - bool gzipInflate(const std::string& in, std::string& out); - - std::shared_ptr _socket; - const static std::string kPost; const static std::string kGet; const static std::string kHead; const static std::string kDel; const static std::string kPut; + + private: + void log(const std::string& msg, HttpRequestArgsPtr args); + + bool gzipInflate(const std::string& in, std::string& out); + + // Async API background thread runner + void run(); + + // Async API + bool _async; + std::queue> _queue; + mutable std::mutex _queueMutex; + std::condition_variable _condition; + std::atomic _stop; + std::thread _thread; + + std::shared_ptr _socket; }; } // namespace ix diff --git a/ixwebsocket/IXSocketMbedTLS.cpp b/ixwebsocket/IXSocketMbedTLS.cpp index a72e3549..af8f744d 100644 --- a/ixwebsocket/IXSocketMbedTLS.cpp +++ b/ixwebsocket/IXSocketMbedTLS.cpp @@ -82,7 +82,7 @@ namespace ix mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL); int res; - do + do { res = mbedtls_ssl_handshake(&_ssl); } diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index 64e9ddb6..50aebe66 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -81,7 +81,7 @@ namespace ix { _onMessageUserCallback = std::move(callback); } - + WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() { MessagePtr message; diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 4a722eef..51544212 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -1041,7 +1041,7 @@ namespace ix _requestInitCancellation = true; if (_readyState == ReadyState::CLOSING || _readyState == ReadyState::CLOSED) return; - + { std::lock_guard lock(_closeDataMutex); _closeCode = code; diff --git a/test/IXHttpClientTest.cpp b/test/IXHttpClientTest.cpp index 2c2875aa..e7043ec0 100644 --- a/test/IXHttpClientTest.cpp +++ b/test/IXHttpClientTest.cpp @@ -15,85 +15,219 @@ TEST_CASE("http client", "[http]") { SECTION("Connect to a remote HTTP server") { - std::string url("http://httpbin.org/"); - + HttpClient httpClient; WebSocketHttpHeaders headers; - headers["User-Agent"] = "ixwebsocket"; - HttpRequestArgs args; - args.extraHeaders = headers; - args.connectTimeout = 60; - args.transferTimeout = 60; - args.followRedirects = true; - args.maxRedirects = 10; - args.verbose = true; - args.compress = true; - args.logger = [](const std::string& msg) + std::string url("http://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) { std::cout << msg; }; - args.onProgressCallback = [](int current, int total) -> bool + args->onProgressCallback = [](int current, int total) -> bool { std::cerr << "\r" << "Downloaded " << current << " bytes out of " << total; return true; }; - HttpClient httpClient; - HttpResponse response = httpClient.get(url, args); + auto response = httpClient.get(url, args); - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; + std::cerr << "Error message: " << response->errorMsg << std::endl; - REQUIRE(response.errorCode == HttpErrorCode::Ok); + REQUIRE(response->errorCode == HttpErrorCode::Ok); + REQUIRE(response->statusCode == 200); } -#if defined(IXWEBSOCKET_USE_TLS) SECTION("Connect to a remote HTTPS server") { - std::string url("https://httpbin.org/"); - + HttpClient httpClient; WebSocketHttpHeaders headers; - headers["User-Agent"] = "ixwebsocket"; - HttpRequestArgs args; - args.extraHeaders = headers; - args.connectTimeout = 60; - args.transferTimeout = 60; - args.followRedirects = true; - args.maxRedirects = 10; - args.verbose = true; - args.compress = true; - args.logger = [](const std::string& msg) + std::string url("https://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) { std::cout << msg; }; - args.onProgressCallback = [](int current, int total) -> bool + args->onProgressCallback = [](int current, int total) -> bool { std::cerr << "\r" << "Downloaded " << current << " bytes out of " << total; return true; }; - HttpClient httpClient; - HttpResponse response = httpClient.get(url, args); + auto response = httpClient.get(url, args); - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; + std::cerr << "Error message: " << response->errorMsg << std::endl; - REQUIRE(response.errorCode == HttpErrorCode::Ok); + REQUIRE(response->errorCode == HttpErrorCode::Ok); + REQUIRE(response->statusCode == 200); + } + + SECTION("Async API, one call") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + std::string url("https://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) + { + std::cout << msg; + }; + args->onProgressCallback = [](int current, int total) -> bool + { + std::cerr << "\r" << "Downloaded " + << current << " bytes out of " << total; + return true; + }; + + std::atomic requestCompleted(false); + std::atomic statusCode(0); + + httpClient.performRequest(args, [&requestCompleted, &statusCode] + (const HttpResponsePtr& response) + { + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; + std::cerr << "Error message: " << response->errorMsg << std::endl; + + // In case of failure, print response->errorMsg + statusCode = response->statusCode; + requestCompleted = true; + } + ); + + int wait = 0; + while (wait < 5000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + std::cerr << "Done" << std::endl; + REQUIRE(statusCode == 200); + } + + SECTION("Async API, multiple calls") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + std::string url("http://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) + { + std::cout << msg; + }; + args->onProgressCallback = [](int current, int total) -> bool + { + std::cerr << "\r" << "Downloaded " + << current << " bytes out of " << total; + return true; + }; + + std::atomic requestCompleted(false); + std::atomic statusCode0(0); + std::atomic statusCode1(0); + std::atomic statusCode2(0); + + for (int i = 0; i < 3; ++i) + { + httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2] + (const HttpResponsePtr& response) + { + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; + std::cerr << "Error message: " << response->errorMsg << std::endl; + + // In case of failure, print response->errorMsg + if (i == 0) + { + statusCode0 = response->statusCode; + } + else if (i == 1) + { + statusCode1 = response->statusCode; + } + else if (i == 2) + { + statusCode2 = response->statusCode; + requestCompleted = true; + } + } + ); + } + + int wait = 0; + while (wait < 10000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + std::cerr << "Done" << std::endl; + REQUIRE(statusCode0 == 200); + REQUIRE(statusCode1 == 200); + REQUIRE(statusCode2 == 200); } -#endif } diff --git a/test/IXWebSocketCloseTest.cpp b/test/IXWebSocketCloseTest.cpp index 0e3fe018..f915ce4d 100644 --- a/test/IXWebSocketCloseTest.cpp +++ b/test/IXWebSocketCloseTest.cpp @@ -210,7 +210,7 @@ namespace << closeInfo.reason << ")"; log(ss.str()); - + std::lock_guard lck(mutexWrite); receivedCloseCode = closeInfo.code; diff --git a/ws/IXSentryClient.cpp b/ws/IXSentryClient.cpp index 5e78e420..11334e56 100644 --- a/ws/IXSentryClient.cpp +++ b/ws/IXSentryClient.cpp @@ -141,42 +141,42 @@ namespace ix bool SentryClient::send(const Json::Value& msg, bool verbose) { - HttpRequestArgs args; - args.extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader(); - args.connectTimeout = 60; - args.transferTimeout = 5 * 60; - args.followRedirects = true; - args.verbose = verbose; - args.logger = [](const std::string& msg) + auto args = _httpClient.createRequest(); + args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader(); + args->connectTimeout = 60; + args->transferTimeout = 5 * 60; + args->followRedirects = true; + args->verbose = verbose; + args->logger = [](const std::string& msg) { std::cout << msg; }; std::string body = computePayload(msg); - HttpResponse response = _httpClient.post(_url, body, args); + HttpResponsePtr response = _httpClient.post(_url, body, args); if (verbose) { - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; - if (response.errorCode != HttpErrorCode::Ok) + std::cerr << "Status: " << response->statusCode << std::endl; + if (response->errorCode != HttpErrorCode::Ok) { - std::cerr << "error message: " << response.errorMsg << std::endl; + std::cerr << "error message: " << response->errorMsg << std::endl; } - if (response.headers["Content-Type"] != "application/octet-stream") + if (response->headers["Content-Type"] != "application/octet-stream") { - std::cerr << "payload: " << response.payload << std::endl; + std::cerr << "payload: " << response->payload << std::endl; } } - return response.statusCode == 200; + return response->statusCode == 200; } } // namespace ix diff --git a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp index bf160cef..da12faaf 100644 --- a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp +++ b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp @@ -99,7 +99,7 @@ namespace ix void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, const Json::Value& msg) { - // Now actually enqueue the task + // Enqueue the task { // acquire lock std::unique_lock lock(_queue_mutex); diff --git a/ws/ixcobra/IXCobraMetricsThreadedPublisher.h b/ws/ixcobra/IXCobraMetricsThreadedPublisher.h index 55c17c59..50e6e398 100644 --- a/ws/ixcobra/IXCobraMetricsThreadedPublisher.h +++ b/ws/ixcobra/IXCobraMetricsThreadedPublisher.h @@ -25,7 +25,6 @@ namespace ix ~CobraMetricsThreadedPublisher(); /// Configuration / set keys, etc... - /// All input data but the channel name is encrypted with rc4 void configure(const std::string& appkey, const std::string& endpoint, const std::string& channel, diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index cef19864..75a206e6 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -46,9 +46,11 @@ namespace ix std::condition_variable progressCondition; std::queue queue; + SentryClient sentryClient(dsn); + auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex, &queue, verbose, &errorSending, &sentCount, - &stop, &dsn] + &stop, &sentryClient] { while (true) { @@ -62,9 +64,7 @@ namespace ix queue.pop(); } - SentryClient sc(dsn); - - if (!sc.send(msg, verbose)) + if (!sentryClient.send(msg, verbose)) { errorSending = true; } diff --git a/ws/ws_http_client.cpp b/ws/ws_http_client.cpp index 2a2b366d..52b79c17 100644 --- a/ws/ws_http_client.cpp +++ b/ws/ws_http_client.cpp @@ -95,19 +95,20 @@ namespace ix const std::string& output, bool compress) { - HttpRequestArgs args; - args.extraHeaders = parseHeaders(headersData); - args.connectTimeout = connectTimeout; - args.transferTimeout = transferTimeout; - args.followRedirects = followRedirects; - args.maxRedirects = maxRedirects; - args.verbose = verbose; - args.compress = compress; - args.logger = [](const std::string& msg) + HttpClient httpClient; + auto args = httpClient.createRequest(); + args->extraHeaders = parseHeaders(headersData); + args->connectTimeout = connectTimeout; + args->transferTimeout = transferTimeout; + args->followRedirects = followRedirects; + args->maxRedirects = maxRedirects; + args->verbose = verbose; + args->compress = compress; + args->logger = [](const std::string& msg) { std::cout << msg; }; - args.onProgressCallback = [](int current, int total) -> bool + args->onProgressCallback = [](int current, int total) -> bool { std::cerr << "\r" << "Downloaded " << current << " bytes out of " << total; @@ -116,8 +117,7 @@ namespace ix HttpParameters httpParameters = parsePostParameters(data); - HttpClient httpClient; - HttpResponse response; + HttpResponsePtr response; if (headersOnly) { response = httpClient.head(url, args); @@ -133,21 +133,21 @@ namespace ix std::cerr << std::endl; - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; - if (response.errorCode != HttpErrorCode::Ok) + std::cerr << "Status: " << response->statusCode << std::endl; + if (response->errorCode != HttpErrorCode::Ok) { - std::cerr << "error message: " << response.errorMsg << std::endl; + std::cerr << "error message: " << response->errorMsg << std::endl; } - if (!headersOnly && response.errorCode == HttpErrorCode::Ok) + if (!headersOnly && response->errorCode == HttpErrorCode::Ok) { if (save || !output.empty()) { @@ -160,14 +160,14 @@ namespace ix std::cout << "Writing to disk: " << filename << std::endl; std::ofstream out(filename); - out.write((char*)&response.payload.front(), response.payload.size()); + out.write((char*)&response->payload.front(), response->payload.size()); out.close(); } else { - if (response.headers["Content-Type"] != "application/octet-stream") + if (response->headers["Content-Type"] != "application/octet-stream") { - std::cout << "payload: " << response.payload << std::endl; + std::cout << "payload: " << response->payload << std::endl; } else {