ws_cobra_to_sentry improvements

This commit is contained in:
Benjamin Sergeant 2019-06-05 18:45:31 -07:00
parent 03a2f1443b
commit a788b31080
3 changed files with 39 additions and 25 deletions

View File

@ -8,6 +8,7 @@
#include <chrono>
#include <iostream>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
@ -114,6 +115,7 @@ namespace ix
std::string SentryClient::computePayload(const Json::Value& msg)
{
Json::Value payload;
payload["platform"] = "python";
payload["sdk"]["name"] = "ws";
payload["sdk"]["version"] = "1.0.0";
@ -164,17 +166,20 @@ namespace ix
return _jsonWriter.write(payload);
}
bool SentryClient::send(const Json::Value& msg,
std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose)
{
std::string log;
auto args = _httpClient.createRequest();
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->connectTimeout = 60;
args->transferTimeout = 5 * 60;
args->followRedirects = true;
args->verbose = verbose;
args->logger = [](const std::string& msg)
args->logger = [&log](const std::string& msg)
{
log += msg;
std::cout << msg;
};
@ -185,24 +190,24 @@ namespace ix
{
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl;
if (response->errorCode != HttpErrorCode::Ok)
{
std::cerr << "error message: " << response->errorMsg << std::endl;
spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
std::cerr << "payload: " << response->payload << std::endl;
spdlog::info("payload: {}", response->payload);
}
}
return response->statusCode == 200;
return std::make_pair(response, log);
}
} // namespace ix

View File

@ -9,6 +9,7 @@
#include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h>
#include <regex>
#include <algorithm>
namespace ix
{
@ -18,7 +19,7 @@ namespace ix
SentryClient(const std::string& dsn);
~SentryClient() = default;
bool send(const Json::Value& msg, bool verbose);
std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
private:
int64_t getTimestamp();

View File

@ -14,6 +14,7 @@
#include <mutex>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include "IXSentryClient.h"
@ -64,8 +65,13 @@ namespace ix
queue.pop();
}
if (!sentryClient.send(msg, verbose))
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Response: {}", response->payload);
spdlog::error("Log: {}", ret.second);
errorSending = true;
}
else
@ -99,16 +105,16 @@ namespace ix
{
if (eventType == ix::CobraConnection_EventType_Open)
{
std::cerr << "Subscriber: connected" << std::endl;
spdlog::info("Subscriber connected");
for (auto it : headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
std::cerr << "Subscriber: closed" << std::endl;
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
@ -122,7 +128,7 @@ namespace ix
{
if (verbose)
{
std::cerr << jsonWriter.write(msg) << std::endl;
spdlog::info(jsonWriter.write(msg));
}
// If we cannot send to sentry fast enough, drop the message
@ -132,8 +138,7 @@ namespace ix
receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount))
{
std::cerr << "message dropped: sending is backlogged !"
<< std::endl;
spdlog::warn("message dropped: sending is backlogged !");
condition.notify_one();
progressCondition.notify_one();
@ -153,15 +158,15 @@ namespace ix
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
std::cerr << "Subscriber: error" << errMsg << std::endl;
spdlog::error("Subscriber: error {}", errMsg);
}
}
);
@ -172,17 +177,20 @@ namespace ix
std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
progressCondition.wait(lock);
std::cout << "messages"
<< " received " << receivedCount
<< " sent " << sentCount
<< std::endl;
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
if (strict && errorSending) break;
}
conn.disconnect();
// FIXME: join all the bg threads and stop them.
// join all the bg threads and stop them.
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return 0;
}