clang-format
This commit is contained in:
@ -9,23 +9,24 @@
|
||||
#include <condition_variable>
|
||||
#include <ixcobra/IXCobraConnection.h>
|
||||
#include <ixsentry/IXSentryClient.h>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class QueueManager
|
||||
{
|
||||
public:
|
||||
QueueManager(size_t maxQueueSize,
|
||||
std::atomic<bool> &stop) :
|
||||
_maxQueueSize(maxQueueSize),
|
||||
_stop(stop) {}
|
||||
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
|
||||
: _maxQueueSize(maxQueueSize)
|
||||
, _stop(stop)
|
||||
{
|
||||
}
|
||||
|
||||
Json::Value pop();
|
||||
void add(Json::Value msg);
|
||||
@ -136,95 +137,90 @@ namespace ix
|
||||
|
||||
std::thread t1(timer);
|
||||
|
||||
auto sentrySender = [&queueManager,
|
||||
verbose,
|
||||
&errorSending,
|
||||
&sentCount,
|
||||
&stop,
|
||||
&throttled,
|
||||
&dsn] {
|
||||
SentryClient sentryClient(dsn);
|
||||
auto sentrySender =
|
||||
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
|
||||
SentryClient sentryClient(dsn);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Json::Value msg = queueManager.pop();
|
||||
|
||||
if (msg.isNull()) continue;
|
||||
if (stop) return;
|
||||
|
||||
auto ret = sentryClient.send(msg, verbose);
|
||||
HttpResponsePtr response = ret.first;
|
||||
|
||||
if (!response)
|
||||
while (true)
|
||||
{
|
||||
spdlog::warn("Null HTTP Response");
|
||||
continue;
|
||||
}
|
||||
Json::Value msg = queueManager.pop();
|
||||
|
||||
if (verbose)
|
||||
{
|
||||
for (auto it : response->headers)
|
||||
if (msg.isNull()) continue;
|
||||
if (stop) return;
|
||||
|
||||
auto ret = sentryClient.send(msg, verbose);
|
||||
HttpResponsePtr response = ret.first;
|
||||
|
||||
if (!response)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
spdlog::warn("Null HTTP Response");
|
||||
continue;
|
||||
}
|
||||
|
||||
spdlog::info("Upload size: {}", response->uploadSize);
|
||||
spdlog::info("Download size: {}", response->downloadSize);
|
||||
|
||||
spdlog::info("Status: {}", response->statusCode);
|
||||
if (response->errorCode != HttpErrorCode::Ok)
|
||||
if (verbose)
|
||||
{
|
||||
spdlog::info("error message: {}", response->errorMsg);
|
||||
}
|
||||
|
||||
if (response->headers["Content-Type"] != "application/octet-stream")
|
||||
{
|
||||
spdlog::info("payload: {}", response->payload);
|
||||
}
|
||||
}
|
||||
|
||||
if (response->statusCode != 200)
|
||||
{
|
||||
spdlog::error("Error sending data to sentry: {}", response->statusCode);
|
||||
spdlog::error("Body: {}", ret.second);
|
||||
spdlog::error("Response: {}", response->payload);
|
||||
errorSending = true;
|
||||
|
||||
// Error 429 Too Many Requests
|
||||
if (response->statusCode == 429)
|
||||
{
|
||||
auto retryAfter = response->headers["Retry-After"];
|
||||
std::stringstream ss;
|
||||
ss << retryAfter;
|
||||
int seconds;
|
||||
ss >> seconds;
|
||||
|
||||
if (!ss.eof() || ss.fail())
|
||||
for (auto it : response->headers)
|
||||
{
|
||||
seconds = 30;
|
||||
spdlog::warn("Error parsing Retry-After header. "
|
||||
"Using {} for the sleep duration",
|
||||
seconds);
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
|
||||
"and retry after {} seconds",
|
||||
retryAfter);
|
||||
spdlog::info("Upload size: {}", response->uploadSize);
|
||||
spdlog::info("Download size: {}", response->downloadSize);
|
||||
|
||||
throttled = true;
|
||||
auto duration = std::chrono::seconds(seconds);
|
||||
std::this_thread::sleep_for(duration);
|
||||
throttled = false;
|
||||
spdlog::info("Status: {}", response->statusCode);
|
||||
if (response->errorCode != HttpErrorCode::Ok)
|
||||
{
|
||||
spdlog::info("error message: {}", response->errorMsg);
|
||||
}
|
||||
|
||||
if (response->headers["Content-Type"] != "application/octet-stream")
|
||||
{
|
||||
spdlog::info("payload: {}", response->payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
++sentCount;
|
||||
}
|
||||
|
||||
if (stop) return;
|
||||
}
|
||||
};
|
||||
if (response->statusCode != 200)
|
||||
{
|
||||
spdlog::error("Error sending data to sentry: {}", response->statusCode);
|
||||
spdlog::error("Body: {}", ret.second);
|
||||
spdlog::error("Response: {}", response->payload);
|
||||
errorSending = true;
|
||||
|
||||
// Error 429 Too Many Requests
|
||||
if (response->statusCode == 429)
|
||||
{
|
||||
auto retryAfter = response->headers["Retry-After"];
|
||||
std::stringstream ss;
|
||||
ss << retryAfter;
|
||||
int seconds;
|
||||
ss >> seconds;
|
||||
|
||||
if (!ss.eof() || ss.fail())
|
||||
{
|
||||
seconds = 30;
|
||||
spdlog::warn("Error parsing Retry-After header. "
|
||||
"Using {} for the sleep duration",
|
||||
seconds);
|
||||
}
|
||||
|
||||
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
|
||||
"and retry after {} seconds",
|
||||
retryAfter);
|
||||
|
||||
throttled = true;
|
||||
auto duration = std::chrono::seconds(seconds);
|
||||
std::this_thread::sleep_for(duration);
|
||||
throttled = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
++sentCount;
|
||||
}
|
||||
|
||||
if (stop) return;
|
||||
}
|
||||
};
|
||||
|
||||
// Create a thread pool
|
||||
spdlog::info("Starting {} sentry sender jobs", jobs);
|
||||
@ -241,12 +237,11 @@ namespace ix
|
||||
verbose,
|
||||
&throttled,
|
||||
&receivedCount,
|
||||
&queueManager](
|
||||
ix::CobraConnectionEventType eventType,
|
||||
const std::string& errMsg,
|
||||
const ix::WebSocketHttpHeaders& headers,
|
||||
const std::string& subscriptionId,
|
||||
CobraConnection::MsgId msgId) {
|
||||
&queueManager](ix::CobraConnectionEventType eventType,
|
||||
const std::string& errMsg,
|
||||
const ix::WebSocketHttpHeaders& headers,
|
||||
const std::string& subscriptionId,
|
||||
CobraConnection::MsgId msgId) {
|
||||
if (eventType == ix::CobraConnection_EventType_Open)
|
||||
{
|
||||
spdlog::info("Subscriber connected");
|
||||
@ -265,11 +260,8 @@ namespace ix
|
||||
spdlog::info("Subscriber authenticated");
|
||||
conn.subscribe(channel,
|
||||
filter,
|
||||
[&jsonWriter,
|
||||
verbose,
|
||||
&throttled,
|
||||
&receivedCount,
|
||||
&queueManager](const Json::Value& msg) {
|
||||
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
|
||||
const Json::Value& msg) {
|
||||
if (verbose)
|
||||
{
|
||||
spdlog::info(jsonWriter.write(msg));
|
||||
|
Reference in New Issue
Block a user