cobra to sentry fixes

This commit is contained in:
Benjamin Sergeant 2019-06-05 18:47:48 -07:00
parent d50125c62d
commit 15d8c663da
5 changed files with 71 additions and 26 deletions

View File

@ -109,6 +109,10 @@ namespace ix
HttpRequestArgsPtr args,
int redirects)
{
// We only have one socket connection, so we cannot
// make multiple requests concurrently.
std::lock_guard<std::mutex> lock(_mutex);
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;

View File

@ -156,5 +156,6 @@ namespace ix
std::thread _thread;
std::shared_ptr<Socket> _socket;
std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
};
} // namespace ix

View File

@ -8,6 +8,7 @@
#include <chrono>
#include <iostream>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
@ -114,6 +115,7 @@ namespace ix
std::string SentryClient::computePayload(const Json::Value& msg)
{
Json::Value payload;
payload["platform"] = "python";
payload["sdk"]["name"] = "ws";
payload["sdk"]["version"] = "1.0.0";
@ -132,23 +134,52 @@ namespace ix
Json::Value extra;
extra["cobra_event"] = msg;
extra["cobra_event"] = msg;
exception["extra"] = extra;
//
// "tags": [
// [
// "a",
// "b"
// ],
// ]
//
Json::Value tags;
Json::Value gameTag;
gameTag.append("game");
gameTag.append(msg["device"]["game"]);
tags.append(gameTag);
Json::Value userIdTag;
userIdTag.append("userid");
userIdTag.append(msg["device"]["user_id"]);
tags.append(userIdTag);
Json::Value environmentTag;
environmentTag.append("environment");
environmentTag.append(msg["device"]["environment"]);
tags.append(environmentTag);
payload["tags"] = tags;
return _jsonWriter.write(payload);
}
bool SentryClient::send(const Json::Value& msg,
std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose)
{
std::string log;
auto args = _httpClient.createRequest();
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->connectTimeout = 60;
args->transferTimeout = 5 * 60;
args->followRedirects = true;
args->verbose = verbose;
args->logger = [](const std::string& msg)
args->logger = [&log](const std::string& msg)
{
log += msg;
std::cout << msg;
};
@ -159,24 +190,24 @@ namespace ix
{
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl;
if (response->errorCode != HttpErrorCode::Ok)
{
std::cerr << "error message: " << response->errorMsg << std::endl;
spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
std::cerr << "payload: " << response->payload << std::endl;
spdlog::info("payload: {}", response->payload);
}
}
return response->statusCode == 200;
return std::make_pair(response, log);
}
} // namespace ix

View File

@ -9,6 +9,7 @@
#include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h>
#include <regex>
#include <algorithm>
namespace ix
{
@ -18,7 +19,7 @@ namespace ix
SentryClient(const std::string& dsn);
~SentryClient() = default;
bool send(const Json::Value& msg, bool verbose);
std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
private:
int64_t getTimestamp();

View File

@ -14,6 +14,7 @@
#include <mutex>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include "IXSentryClient.h"
@ -64,8 +65,13 @@ namespace ix
queue.pop();
}
if (!sentryClient.send(msg, verbose))
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Response: {}", response->payload);
spdlog::error("Log: {}", ret.second);
errorSending = true;
}
else
@ -99,16 +105,16 @@ namespace ix
{
if (eventType == ix::CobraConnection_EventType_Open)
{
std::cerr << "Subscriber: connected" << std::endl;
spdlog::info("Subscriber connected");
for (auto it : headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
std::cerr << "Subscriber: closed" << std::endl;
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
@ -122,7 +128,7 @@ namespace ix
{
if (verbose)
{
std::cerr << jsonWriter.write(msg) << std::endl;
spdlog::info(jsonWriter.write(msg));
}
// If we cannot send to sentry fast enough, drop the message
@ -132,8 +138,7 @@ namespace ix
receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount))
{
std::cerr << "message dropped: sending is backlogged !"
<< std::endl;
spdlog::warn("message dropped: sending is backlogged !");
condition.notify_one();
progressCondition.notify_one();
@ -153,15 +158,15 @@ namespace ix
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
std::cerr << "Subscriber: error" << errMsg << std::endl;
spdlog::error("Subscriber: error {}", errMsg);
}
}
);
@ -172,17 +177,20 @@ namespace ix
std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
progressCondition.wait(lock);
std::cout << "messages"
<< " received " << receivedCount
<< " sent " << sentCount
<< std::endl;
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
if (strict && errorSending) break;
}
conn.disconnect();
// FIXME: join all the bg threads and stop them.
// join all the bg threads and stop them.
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return 0;
}