diff --git a/ixwebsocket/IXHttpClient.cpp b/ixwebsocket/IXHttpClient.cpp index cda2dd2a..ae661b9c 100644 --- a/ixwebsocket/IXHttpClient.cpp +++ b/ixwebsocket/IXHttpClient.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include @@ -24,17 +25,87 @@ namespace ix const std::string HttpClient::kDel = "DEL"; const std::string HttpClient::kPut = "PUT"; - HttpClient::HttpClient() + HttpClient::HttpClient(bool async) : _async(async), _stop(false) { + if (!_async) return; + _thread = std::thread(&HttpClient::run, this); } HttpClient::~HttpClient() { + if (!_thread.joinable()) return; + _stop = true; + _condition.notify_one(); + _thread.join(); } - HttpResponse HttpClient::request( + HttpRequestArgsPtr HttpClient::createRequest(const std::string& url, + const std::string& verb) + { + auto request = std::make_shared(); + request->url = url; + request->verb = verb; + return request; + } + + bool HttpClient::performRequest(HttpRequestArgsPtr args, + const OnResponseCallback& onResponseCallback) + { + if (!_async) return false; + + // Enqueue the task + { + // acquire lock + std::unique_lock lock(_queueMutex); + + // add the task + _queue.push(std::make_pair(args, onResponseCallback)); + } // release lock + + // wake up one thread + _condition.notify_one(); + + return true; + } + + void HttpClient::run() + { + while (true) + { + HttpRequestArgsPtr args; + OnResponseCallback onResponseCallback; + + { + std::unique_lock lock(_queueMutex); + + while (!_stop && _queue.empty()) + { + _condition.wait(lock); + } + + if (_stop) return; + + auto p = _queue.front(); + _queue.pop(); + + args = p.first; + onResponseCallback = p.second; + } + + if (_stop) return; + + HttpResponsePtr response = request(args->url, args->verb, args->body, *args); + onResponseCallback(response); + + if (_stop) return; + } + + std::cout << "HttpClient::run() finished" << std::endl; + } + + HttpResponsePtr HttpClient::request( const std::string& url, const std::string& verb, const std::string& body, @@ -54,7 +125,7 @@ namespace ix { std::stringstream ss; ss << "Cannot parse url: " << url; - return HttpResponse(code, HttpErrorCode::UrlMalformed, + return std::make_shared(code, HttpErrorCode::UrlMalformed, headers, payload, ss.str(), uploadSize, downloadSize); } @@ -65,7 +136,7 @@ namespace ix if (!_socket) { - return HttpResponse(code, HttpErrorCode::CannotCreateSocket, + return std::make_shared(code, HttpErrorCode::CannotCreateSocket, headers, payload, errorMsg, uploadSize, downloadSize); } @@ -128,9 +199,9 @@ namespace ix { std::stringstream ss; ss << "Cannot connect to url: " << url << " / error : " << errMsg; - return HttpResponse(code, HttpErrorCode::CannotConnect, - headers, payload, ss.str(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotConnect, + headers, payload, ss.str(), + uploadSize, downloadSize); } // Make a new cancellation object dealing with transfer timeout @@ -154,9 +225,9 @@ namespace ix if (!_socket->writeBytes(req, isCancellationRequested)) { std::string errorMsg("Cannot send request"); - return HttpResponse(code, HttpErrorCode::SendError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::SendError, + headers, payload, errorMsg, + uploadSize, downloadSize); } uploadSize = req.size(); @@ -168,9 +239,9 @@ namespace ix if (!lineValid) { std::string errorMsg("Cannot retrieve status line"); - return HttpResponse(code, HttpErrorCode::CannotReadStatusLine, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotReadStatusLine, + headers, payload, errorMsg, + uploadSize, downloadSize); } if (args.verbose) @@ -183,9 +254,9 @@ namespace ix if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1) { std::string errorMsg("Cannot parse response code from status line"); - return HttpResponse(code, HttpErrorCode::MissingStatus, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::MissingStatus, + headers, payload, errorMsg, + uploadSize, downloadSize); } auto result = parseHttpHeaders(_socket, isCancellationRequested); @@ -195,9 +266,9 @@ namespace ix if (!headersValid) { std::string errorMsg("Cannot parse http headers"); - return HttpResponse(code, HttpErrorCode::HeaderParsingError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::HeaderParsingError, + headers, payload, errorMsg, + uploadSize, downloadSize); } // Redirect ? @@ -206,18 +277,18 @@ namespace ix if (headers.find("Location") == headers.end()) { std::string errorMsg("Missing location header for redirect"); - return HttpResponse(code, HttpErrorCode::MissingLocation, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::MissingLocation, + headers, payload, errorMsg, + uploadSize, downloadSize); } if (redirects >= args.maxRedirects) { std::stringstream ss; ss << "Too many redirects: " << redirects; - return HttpResponse(code, HttpErrorCode::TooManyRedirects, - headers, payload, ss.str(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::TooManyRedirects, + headers, payload, ss.str(), + uploadSize, downloadSize); } // Recurse @@ -227,9 +298,9 @@ namespace ix if (verb == "HEAD") { - return HttpResponse(code, HttpErrorCode::Ok, - headers, payload, std::string(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Ok, + headers, payload, std::string(), + uploadSize, downloadSize); } // Parse response: @@ -248,9 +319,9 @@ namespace ix if (!chunkResult.first) { errorMsg = "Cannot read chunk"; - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload += chunkResult.second; } @@ -266,9 +337,9 @@ namespace ix if (!lineResult.first) { - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } uint64_t chunkSize; @@ -293,9 +364,9 @@ namespace ix if (!chunkResult.first) { errorMsg = "Cannot read chunk"; - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload += chunkResult.second; @@ -304,9 +375,9 @@ namespace ix if (!lineResult.first) { - return HttpResponse(code, HttpErrorCode::ChunkReadError, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::ChunkReadError, + headers, payload, errorMsg, + uploadSize, downloadSize); } if (chunkSize == 0) break; @@ -319,9 +390,9 @@ namespace ix else { std::string errorMsg("Cannot read http body"); - return HttpResponse(code, HttpErrorCode::CannotReadBody, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::CannotReadBody, + headers, payload, errorMsg, + uploadSize, downloadSize); } downloadSize = payload.size(); @@ -333,58 +404,58 @@ namespace ix if (!gzipInflate(payload, decompressedPayload)) { std::string errorMsg("Error decompressing payload"); - return HttpResponse(code, HttpErrorCode::Gzip, - headers, payload, errorMsg, - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Gzip, + headers, payload, errorMsg, + uploadSize, downloadSize); } payload = decompressedPayload; } - return HttpResponse(code, HttpErrorCode::Ok, - headers, payload, std::string(), - uploadSize, downloadSize); + return std::make_shared(code, HttpErrorCode::Ok, + headers, payload, std::string(), + uploadSize, downloadSize); } - HttpResponse HttpClient::get(const std::string& url, + HttpResponsePtr HttpClient::get(const std::string& url, const HttpRequestArgs& args) { return request(url, kGet, std::string(), args); } - HttpResponse HttpClient::head(const std::string& url, + HttpResponsePtr HttpClient::head(const std::string& url, const HttpRequestArgs& args) { return request(url, kHead, std::string(), args); } - HttpResponse HttpClient::del(const std::string& url, + HttpResponsePtr HttpClient::del(const std::string& url, const HttpRequestArgs& args) { return request(url, kDel, std::string(), args); } - HttpResponse HttpClient::post(const std::string& url, + HttpResponsePtr HttpClient::post(const std::string& url, const HttpParameters& httpParameters, const HttpRequestArgs& args) { return request(url, kPost, serializeHttpParameters(httpParameters), args); } - HttpResponse HttpClient::post(const std::string& url, + HttpResponsePtr HttpClient::post(const std::string& url, const std::string& body, const HttpRequestArgs& args) { return request(url, kPost, body, args); } - HttpResponse HttpClient::put(const std::string& url, + HttpResponsePtr HttpClient::put(const std::string& url, const HttpParameters& httpParameters, const HttpRequestArgs& args) { return request(url, kPut, serializeHttpParameters(httpParameters), args); } - HttpResponse HttpClient::put(const std::string& url, + HttpResponsePtr HttpClient::put(const std::string& url, const std::string& body, const HttpRequestArgs& args) { diff --git a/ixwebsocket/IXHttpClient.h b/ixwebsocket/IXHttpClient.h index ccaed27a..a41a0a06 100644 --- a/ixwebsocket/IXHttpClient.h +++ b/ixwebsocket/IXHttpClient.h @@ -10,11 +10,13 @@ #include "IXWebSocketHttpHeaders.h" #include #include +#include #include #include #include #include -#include +#include +#include namespace ix { @@ -34,7 +36,8 @@ namespace ix MissingLocation = 11, TooManyRedirects = 12, ChunkReadError = 13, - CannotReadBody = 14 + CannotReadBody = 14, + Invalid = 100 }; struct HttpResponse @@ -66,12 +69,15 @@ namespace ix } }; + using HttpResponsePtr = std::shared_ptr; using HttpParameters = std::map; using Logger = std::function; + using OnResponseCallback = std::function; struct HttpRequestArgs { std::string url; + std::string verb; WebSocketHttpHeaders extraHeaders; std::string body; int connectTimeout; @@ -84,51 +90,71 @@ namespace ix OnProgressCallback onProgressCallback; }; + using HttpRequestArgsPtr = std::shared_ptr; + class HttpClient { public: - HttpClient(); + HttpClient(bool async = false); ~HttpClient(); - HttpResponse get(const std::string& url, const HttpRequestArgs& args); - HttpResponse head(const std::string& url, const HttpRequestArgs& args); - HttpResponse del(const std::string& url, const HttpRequestArgs& args); + HttpResponsePtr get(const std::string& url, const HttpRequestArgs& args); + HttpResponsePtr head(const std::string& url, const HttpRequestArgs& args); + HttpResponsePtr del(const std::string& url, const HttpRequestArgs& args); - HttpResponse post(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args); - HttpResponse post(const std::string& url, - const std::string& body, - const HttpRequestArgs& args); - - HttpResponse put(const std::string& url, - const HttpParameters& httpParameters, - const HttpRequestArgs& args); - HttpResponse put(const std::string& url, - const std::string& body, - const HttpRequestArgs& args); - - HttpResponse request(const std::string& url, - const std::string& verb, + HttpResponsePtr post(const std::string& url, + const HttpParameters& httpParameters, + const HttpRequestArgs& args); + HttpResponsePtr post(const std::string& url, const std::string& body, - const HttpRequestArgs& args, - int redirects = 0); + const HttpRequestArgs& args); + + HttpResponsePtr put(const std::string& url, + const HttpParameters& httpParameters, + const HttpRequestArgs& args); + HttpResponsePtr put(const std::string& url, + const std::string& body, + const HttpRequestArgs& args); + + HttpResponsePtr request(const std::string& url, + const std::string& verb, + const std::string& body, + const HttpRequestArgs& args, + int redirects = 0); + + // Async API + HttpRequestArgsPtr createRequest(const std::string& url, + const std::string& verb = HttpClient::kGet); + + bool performRequest(HttpRequestArgsPtr request, + const OnResponseCallback& onResponseCallback); std::string serializeHttpParameters(const HttpParameters& httpParameters); std::string urlEncode(const std::string& value); - private: - void log(const std::string& msg, const HttpRequestArgs& args); - - bool gzipInflate(const std::string& in, std::string& out); - - std::shared_ptr _socket; - const static std::string kPost; const static std::string kGet; const static std::string kHead; const static std::string kDel; const static std::string kPut; + + private: + void log(const std::string& msg, const HttpRequestArgs& args); + + bool gzipInflate(const std::string& in, std::string& out); + + // Async API background thread runner + void run(); + + // Async API + bool _async; + std::queue> _queue; + mutable std::mutex _queueMutex; + std::condition_variable _condition; + std::atomic _stop; + std::thread _thread; + + std::shared_ptr _socket; }; } // namespace ix diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 31a9867d..17bd0035 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -10,8 +10,8 @@ set (CMAKE_CXX_STANDARD 14) if (NOT WIN32) set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH}) find_package(Sanitizers) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") - set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") + set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") option(USE_TLS "Add TLS support" ON) endif() diff --git a/test/IXHttpClientTest.cpp b/test/IXHttpClientTest.cpp index 2c2875aa..c7cdbc82 100644 --- a/test/IXHttpClientTest.cpp +++ b/test/IXHttpClientTest.cpp @@ -18,7 +18,6 @@ TEST_CASE("http client", "[http]") std::string url("http://httpbin.org/"); WebSocketHttpHeaders headers; - headers["User-Agent"] = "ixwebsocket"; HttpRequestArgs args; args.extraHeaders = headers; @@ -40,27 +39,26 @@ TEST_CASE("http client", "[http]") }; HttpClient httpClient; - HttpResponse response = httpClient.get(url, args); + auto response = httpClient.get(url, args); - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; - REQUIRE(response.errorCode == HttpErrorCode::Ok); + REQUIRE(response->errorCode == HttpErrorCode::Ok); + REQUIRE(response->statusCode == 200); } -#if defined(IXWEBSOCKET_USE_TLS) SECTION("Connect to a remote HTTPS server") { std::string url("https://httpbin.org/"); WebSocketHttpHeaders headers; - headers["User-Agent"] = "ixwebsocket"; HttpRequestArgs args; args.extraHeaders = headers; @@ -82,18 +80,143 @@ TEST_CASE("http client", "[http]") }; HttpClient httpClient; - HttpResponse response = httpClient.get(url, args); + auto response = httpClient.get(url, args); - for (auto it : response.headers) + for (auto it : response->headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::cerr << "Upload size: " << response.uploadSize << std::endl; - std::cerr << "Download size: " << response.downloadSize << std::endl; - std::cerr << "Status: " << response.statusCode << std::endl; + std::cerr << "Upload size: " << response->uploadSize << std::endl; + std::cerr << "Download size: " << response->downloadSize << std::endl; + std::cerr << "Status: " << response->statusCode << std::endl; - REQUIRE(response.errorCode == HttpErrorCode::Ok); + REQUIRE(response->errorCode == HttpErrorCode::Ok); + REQUIRE(response->statusCode == 200); + } + + SECTION("Async API, one call") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + std::string url("https://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) + { + std::cout << msg; + }; + args->onProgressCallback = [](int current, int total) -> bool + { + std::cerr << "\r" << "Downloaded " + << current << " bytes out of " << total; + return true; + }; + + std::atomic requestCompleted(false); + std::atomic statusCode(0); + + httpClient.performRequest(args, [&requestCompleted, &statusCode] + (const HttpResponsePtr& response) + { + // In case of failure, print response->errorMsg + statusCode = response->statusCode; + requestCompleted = true; + } + ); + + int wait = 0; + while (wait < 5000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + + std::cerr << "Done" << std::endl; + REQUIRE(statusCode == 200); + } + + SECTION("Async API, multiple calls") + { + bool async = true; + HttpClient httpClient(async); + WebSocketHttpHeaders headers; + + std::string url("http://httpbin.org/"); + auto args = httpClient.createRequest(url); + + args->extraHeaders = headers; + args->connectTimeout = 60; + args->transferTimeout = 60; + args->followRedirects = true; + args->maxRedirects = 10; + args->verbose = true; + args->compress = true; + args->logger = [](const std::string& msg) + { + std::cout << msg; + }; + args->onProgressCallback = [](int current, int total) -> bool + { + std::cerr << "\r" << "Downloaded " + << current << " bytes out of " << total; + return true; + }; + + std::atomic requestCompleted(false); + std::atomic statusCode0(0); + std::atomic statusCode1(0); + std::atomic statusCode2(0); + + for (int i = 0; i < 3; ++i) + { + httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2] + (const HttpResponsePtr& response) + { + // In case of failure, print response->errorMsg + if (i == 0) + { + statusCode0 = response->statusCode; + } + else if (i == 1) + { + statusCode1 = response->statusCode; + } + else if (i == 2) + { + statusCode2 = response->statusCode; + requestCompleted = true; + } + } + ); + } + + int wait = 0; + while (wait < 10000) + { + if (requestCompleted) break; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + wait += 10; + } + + std::cerr << "Done" << std::endl; + REQUIRE(statusCode0 == 200); + REQUIRE(statusCode1 == 200); + REQUIRE(statusCode2 == 200); } -#endif } diff --git a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp index bf160cef..da12faaf 100644 --- a/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp +++ b/ws/ixcobra/IXCobraMetricsThreadedPublisher.cpp @@ -99,7 +99,7 @@ namespace ix void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, const Json::Value& msg) { - // Now actually enqueue the task + // Enqueue the task { // acquire lock std::unique_lock lock(_queue_mutex); diff --git a/ws/ixcobra/IXCobraMetricsThreadedPublisher.h b/ws/ixcobra/IXCobraMetricsThreadedPublisher.h index 55c17c59..50e6e398 100644 --- a/ws/ixcobra/IXCobraMetricsThreadedPublisher.h +++ b/ws/ixcobra/IXCobraMetricsThreadedPublisher.h @@ -25,7 +25,6 @@ namespace ix ~CobraMetricsThreadedPublisher(); /// Configuration / set keys, etc... - /// All input data but the channel name is encrypted with rc4 void configure(const std::string& appkey, const std::string& endpoint, const std::string& channel, diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index cef19864..75a206e6 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -46,9 +46,11 @@ namespace ix std::condition_variable progressCondition; std::queue queue; + SentryClient sentryClient(dsn); + auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex, &queue, verbose, &errorSending, &sentCount, - &stop, &dsn] + &stop, &sentryClient] { while (true) { @@ -62,9 +64,7 @@ namespace ix queue.pop(); } - SentryClient sc(dsn); - - if (!sc.send(msg, verbose)) + if (!sentryClient.send(msg, verbose)) { errorSending = true; }