unittest working / uses shared_ptr for a bunch of things 🗿

This commit is contained in:
Benjamin Sergeant 2019-06-05 15:18:19 -07:00
parent 9262880369
commit 3257ad1363
7 changed files with 330 additions and 111 deletions

View File

@ -13,6 +13,7 @@
#include <iomanip> #include <iomanip>
#include <vector> #include <vector>
#include <cstring> #include <cstring>
#include <iostream>
#include <zlib.h> #include <zlib.h>
@ -24,17 +25,87 @@ namespace ix
const std::string HttpClient::kDel = "DEL"; const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT"; const std::string HttpClient::kPut = "PUT";
HttpClient::HttpClient() HttpClient::HttpClient(bool async) : _async(async), _stop(false)
{ {
if (!_async) return;
_thread = std::thread(&HttpClient::run, this);
} }
HttpClient::~HttpClient() HttpClient::~HttpClient()
{ {
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
} }
HttpResponse HttpClient::request( HttpRequestArgsPtr HttpClient::createRequest(const std::string& url,
const std::string& verb)
{
auto request = std::make_shared<HttpRequestArgs>();
request->url = url;
request->verb = verb;
return request;
}
bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback)
{
if (!_async) return false;
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queueMutex);
// add the task
_queue.push(std::make_pair(args, onResponseCallback));
} // release lock
// wake up one thread
_condition.notify_one();
return true;
}
void HttpClient::run()
{
while (true)
{
HttpRequestArgsPtr args;
OnResponseCallback onResponseCallback;
{
std::unique_lock<std::mutex> lock(_queueMutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop) return;
auto p = _queue.front();
_queue.pop();
args = p.first;
onResponseCallback = p.second;
}
if (_stop) return;
HttpResponsePtr response = request(args->url, args->verb, args->body, *args);
onResponseCallback(response);
if (_stop) return;
}
std::cout << "HttpClient::run() finished" << std::endl;
}
HttpResponsePtr HttpClient::request(
const std::string& url, const std::string& url,
const std::string& verb, const std::string& verb,
const std::string& body, const std::string& body,
@ -54,7 +125,7 @@ namespace ix
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot parse url: " << url; ss << "Cannot parse url: " << url;
return HttpResponse(code, HttpErrorCode::UrlMalformed, return std::make_shared<HttpResponse>(code, HttpErrorCode::UrlMalformed,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -65,7 +136,7 @@ namespace ix
if (!_socket) if (!_socket)
{ {
return HttpResponse(code, HttpErrorCode::CannotCreateSocket, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotCreateSocket,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -128,9 +199,9 @@ namespace ix
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot connect to url: " << url << " / error : " << errMsg; ss << "Cannot connect to url: " << url << " / error : " << errMsg;
return HttpResponse(code, HttpErrorCode::CannotConnect, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Make a new cancellation object dealing with transfer timeout // Make a new cancellation object dealing with transfer timeout
@ -154,9 +225,9 @@ namespace ix
if (!_socket->writeBytes(req, isCancellationRequested)) if (!_socket->writeBytes(req, isCancellationRequested))
{ {
std::string errorMsg("Cannot send request"); std::string errorMsg("Cannot send request");
return HttpResponse(code, HttpErrorCode::SendError, return std::make_shared<HttpResponse>(code, HttpErrorCode::SendError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
uploadSize = req.size(); uploadSize = req.size();
@ -168,9 +239,9 @@ namespace ix
if (!lineValid) if (!lineValid)
{ {
std::string errorMsg("Cannot retrieve status line"); std::string errorMsg("Cannot retrieve status line");
return HttpResponse(code, HttpErrorCode::CannotReadStatusLine, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (args.verbose) if (args.verbose)
@ -183,9 +254,9 @@ namespace ix
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1) if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{ {
std::string errorMsg("Cannot parse response code from status line"); std::string errorMsg("Cannot parse response code from status line");
return HttpResponse(code, HttpErrorCode::MissingStatus, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(_socket, isCancellationRequested);
@ -195,9 +266,9 @@ namespace ix
if (!headersValid) if (!headersValid)
{ {
std::string errorMsg("Cannot parse http headers"); std::string errorMsg("Cannot parse http headers");
return HttpResponse(code, HttpErrorCode::HeaderParsingError, return std::make_shared<HttpResponse>(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Redirect ? // Redirect ?
@ -206,18 +277,18 @@ namespace ix
if (headers.find("Location") == headers.end()) if (headers.find("Location") == headers.end())
{ {
std::string errorMsg("Missing location header for redirect"); std::string errorMsg("Missing location header for redirect");
return HttpResponse(code, HttpErrorCode::MissingLocation, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (redirects >= args.maxRedirects) if (redirects >= args.maxRedirects)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Too many redirects: " << redirects; ss << "Too many redirects: " << redirects;
return HttpResponse(code, HttpErrorCode::TooManyRedirects, return std::make_shared<HttpResponse>(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Recurse // Recurse
@ -227,9 +298,9 @@ namespace ix
if (verb == "HEAD") if (verb == "HEAD")
{ {
return HttpResponse(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Parse response: // Parse response:
@ -248,9 +319,9 @@ namespace ix
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second; payload += chunkResult.second;
} }
@ -266,9 +337,9 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
uint64_t chunkSize; uint64_t chunkSize;
@ -293,9 +364,9 @@ namespace ix
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second; payload += chunkResult.second;
@ -304,9 +375,9 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (chunkSize == 0) break; if (chunkSize == 0) break;
@ -319,9 +390,9 @@ namespace ix
else else
{ {
std::string errorMsg("Cannot read http body"); std::string errorMsg("Cannot read http body");
return HttpResponse(code, HttpErrorCode::CannotReadBody, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
downloadSize = payload.size(); downloadSize = payload.size();
@ -333,58 +404,58 @@ namespace ix
if (!gzipInflate(payload, decompressedPayload)) if (!gzipInflate(payload, decompressedPayload))
{ {
std::string errorMsg("Error decompressing payload"); std::string errorMsg("Error decompressing payload");
return HttpResponse(code, HttpErrorCode::Gzip, return std::make_shared<HttpResponse>(code, HttpErrorCode::Gzip,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload = decompressedPayload; payload = decompressedPayload;
} }
return HttpResponse(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
HttpResponse HttpClient::get(const std::string& url, HttpResponsePtr HttpClient::get(const std::string& url,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kGet, std::string(), args); return request(url, kGet, std::string(), args);
} }
HttpResponse HttpClient::head(const std::string& url, HttpResponsePtr HttpClient::head(const std::string& url,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kHead, std::string(), args); return request(url, kHead, std::string(), args);
} }
HttpResponse HttpClient::del(const std::string& url, HttpResponsePtr HttpClient::del(const std::string& url,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kDel, std::string(), args); return request(url, kDel, std::string(), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kPost, serializeHttpParameters(httpParameters), args); return request(url, kPost, serializeHttpParameters(httpParameters), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::post(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kPost, body, args); return request(url, kPost, body, args);
} }
HttpResponse HttpClient::put(const std::string& url, HttpResponsePtr HttpClient::put(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {
return request(url, kPut, serializeHttpParameters(httpParameters), args); return request(url, kPut, serializeHttpParameters(httpParameters), args);
} }
HttpResponse HttpClient::put(const std::string& url, HttpResponsePtr HttpClient::put(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args) const HttpRequestArgs& args)
{ {

View File

@ -10,11 +10,13 @@
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <tuple> #include <queue>
#include <thread>
namespace ix namespace ix
{ {
@ -34,7 +36,8 @@ namespace ix
MissingLocation = 11, MissingLocation = 11,
TooManyRedirects = 12, TooManyRedirects = 12,
ChunkReadError = 13, ChunkReadError = 13,
CannotReadBody = 14 CannotReadBody = 14,
Invalid = 100
}; };
struct HttpResponse struct HttpResponse
@ -66,12 +69,15 @@ namespace ix
} }
}; };
using HttpResponsePtr = std::shared_ptr<HttpResponse>;
using HttpParameters = std::map<std::string, std::string>; using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>; using Logger = std::function<void(const std::string&)>;
using OnResponseCallback = std::function<void(const HttpResponsePtr&)>;
struct HttpRequestArgs struct HttpRequestArgs
{ {
std::string url; std::string url;
std::string verb;
WebSocketHttpHeaders extraHeaders; WebSocketHttpHeaders extraHeaders;
std::string body; std::string body;
int connectTimeout; int connectTimeout;
@ -84,51 +90,71 @@ namespace ix
OnProgressCallback onProgressCallback; OnProgressCallback onProgressCallback;
}; };
using HttpRequestArgsPtr = std::shared_ptr<HttpRequestArgs>;
class HttpClient class HttpClient
{ {
public: public:
HttpClient(); HttpClient(bool async = false);
~HttpClient(); ~HttpClient();
HttpResponse get(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr get(const std::string& url, const HttpRequestArgs& args);
HttpResponse head(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr head(const std::string& url, const HttpRequestArgs& args);
HttpResponse del(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr del(const std::string& url, const HttpRequestArgs& args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args); const HttpRequestArgs& args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body, const std::string& body,
const HttpRequestArgs& args, const HttpRequestArgs& args);
int redirects = 0);
HttpResponsePtr put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponsePtr put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponsePtr request(const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
// Async API
HttpRequestArgsPtr createRequest(const std::string& url,
const std::string& verb = HttpClient::kGet);
bool performRequest(HttpRequestArgsPtr request,
const OnResponseCallback& onResponseCallback);
std::string serializeHttpParameters(const HttpParameters& httpParameters); std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value); std::string urlEncode(const std::string& value);
private:
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(const std::string& in, std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost; const static std::string kPost;
const static std::string kGet; const static std::string kGet;
const static std::string kHead; const static std::string kHead;
const static std::string kDel; const static std::string kDel;
const static std::string kPut; const static std::string kPut;
private:
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(const std::string& in, std::string& out);
// Async API background thread runner
void run();
// Async API
bool _async;
std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue;
mutable std::mutex _queueMutex;
std::condition_variable _condition;
std::atomic<bool> _stop;
std::thread _thread;
std::shared_ptr<Socket> _socket;
}; };
} // namespace ix } // namespace ix

View File

@ -10,8 +10,8 @@ set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32) if (NOT WIN32)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH}) set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers) find_package(Sanitizers)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
endif() endif()

View File

@ -18,7 +18,6 @@ TEST_CASE("http client", "[http]")
std::string url("http://httpbin.org/"); std::string url("http://httpbin.org/");
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args; HttpRequestArgs args;
args.extraHeaders = headers; args.extraHeaders = headers;
@ -40,27 +39,26 @@ TEST_CASE("http client", "[http]")
}; };
HttpClient httpClient; HttpClient httpClient;
HttpResponse response = httpClient.get(url, args); auto response = httpClient.get(url, args);
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok); REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
} }
#if defined(IXWEBSOCKET_USE_TLS)
SECTION("Connect to a remote HTTPS server") SECTION("Connect to a remote HTTPS server")
{ {
std::string url("https://httpbin.org/"); std::string url("https://httpbin.org/");
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args; HttpRequestArgs args;
args.extraHeaders = headers; args.extraHeaders = headers;
@ -82,18 +80,143 @@ TEST_CASE("http client", "[http]")
}; };
HttpClient httpClient; HttpClient httpClient;
HttpResponse response = httpClient.get(url, args); auto response = httpClient.get(url, args);
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok); REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
SECTION("Async API, one call")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode(0);
httpClient.performRequest(args, [&requestCompleted, &statusCode]
(const HttpResponsePtr& response)
{
// In case of failure, print response->errorMsg
statusCode = response->statusCode;
requestCompleted = true;
}
);
int wait = 0;
while (wait < 5000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode == 200);
}
SECTION("Async API, multiple calls")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode0(0);
std::atomic<int> statusCode1(0);
std::atomic<int> statusCode2(0);
for (int i = 0; i < 3; ++i)
{
httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2]
(const HttpResponsePtr& response)
{
// In case of failure, print response->errorMsg
if (i == 0)
{
statusCode0 = response->statusCode;
}
else if (i == 1)
{
statusCode1 = response->statusCode;
}
else if (i == 2)
{
statusCode2 = response->statusCode;
requestCompleted = true;
}
}
);
}
int wait = 0;
while (wait < 10000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode0 == 200);
REQUIRE(statusCode1 == 200);
REQUIRE(statusCode2 == 200);
} }
#endif
} }

View File

@ -99,7 +99,7 @@ namespace ix
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg) const Json::Value& msg)
{ {
// Now actually enqueue the task // Enqueue the task
{ {
// acquire lock // acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);

View File

@ -25,7 +25,6 @@ namespace ix
~CobraMetricsThreadedPublisher(); ~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc... /// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey, void configure(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& channel, const std::string& channel,

View File

@ -46,9 +46,11 @@ namespace ix
std::condition_variable progressCondition; std::condition_variable progressCondition;
std::queue<Json::Value> queue; std::queue<Json::Value> queue;
SentryClient sentryClient(dsn);
auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex, auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex,
&queue, verbose, &errorSending, &sentCount, &queue, verbose, &errorSending, &sentCount,
&stop, &dsn] &stop, &sentryClient]
{ {
while (true) while (true)
{ {
@ -62,9 +64,7 @@ namespace ix
queue.pop(); queue.pop();
} }
SentryClient sc(dsn); if (!sentryClient.send(msg, verbose))
if (!sc.send(msg, verbose))
{ {
errorSending = true; errorSending = true;
} }