From 3df7c942d78484fd0a638baee5d765c0c038bcc8 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 11 Mar 2020 15:55:45 -0700 Subject: [PATCH] move sentry and statsd cobra ws commands into a new ixbots folder --- CMake/FindSpdlog.cmake | 19 ++ CMakeLists.txt | 1 + ixbots/CMakeLists.txt | 45 ++++ ixbots/ixbots/IXCobraToSentryBot.cpp | 261 ++++++++++++++++++++++ ixbots/ixbots/IXCobraToSentryBot.h | 21 ++ ixbots/ixbots/IXCobraToStatsdBot.cpp | 223 +++++++++++++++++++ ixbots/ixbots/IXCobraToStatsdBot.h | 22 ++ ixbots/ixbots/IXQueueManager.cpp | 64 ++++++ ixbots/ixbots/IXQueueManager.h | 38 ++++ ws/CMakeLists.txt | 1 + ws/ws.cpp | 35 +-- ws/ws.h | 2 +- ws/ws_cobra_metrics_publish.cpp | 9 +- ws/ws_cobra_to_sentry.cpp | 319 +-------------------------- 14 files changed, 714 insertions(+), 346 deletions(-) create mode 100644 CMake/FindSpdlog.cmake create mode 100644 ixbots/CMakeLists.txt create mode 100644 ixbots/ixbots/IXCobraToSentryBot.cpp create mode 100644 ixbots/ixbots/IXCobraToSentryBot.h create mode 100644 ixbots/ixbots/IXCobraToStatsdBot.cpp create mode 100644 ixbots/ixbots/IXCobraToStatsdBot.h create mode 100644 ixbots/ixbots/IXQueueManager.cpp create mode 100644 ixbots/ixbots/IXQueueManager.h diff --git a/CMake/FindSpdlog.cmake b/CMake/FindSpdlog.cmake new file mode 100644 index 00000000..ffd2dba9 --- /dev/null +++ b/CMake/FindSpdlog.cmake @@ -0,0 +1,19 @@ +# Find package structure taken from libcurl + +include(FindPackageHandleStandardArgs) + +find_path(SPDLOG_INCLUDE_DIRS spdlog/spdlog.h) +find_library(JSONCPP_LIBRARY spdlog) + +find_package_handle_standard_args(SPDLOG + FOUND_VAR + SPDLOG_FOUND + REQUIRED_VARS + SPDLOG_LIBRARY + SPDLOG_INCLUDE_DIRS + FAIL_MESSAGE + "Could NOT find spdlog" +) + +set(SPDLOG_INCLUDE_DIRS ${SPDLOG_INCLUDE_DIRS}) +set(SPDLOG_LIBRARIES ${SPDLOG_LIBRARY}) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7732075b..d7f218bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -230,6 +230,7 @@ if (USE_WS OR USE_TEST) add_subdirectory(ixcobra) add_subdirectory(ixsnake) add_subdirectory(ixsentry) + add_subdirectory(ixbots) add_subdirectory(third_party/spdlog spdlog) diff --git a/ixbots/CMakeLists.txt b/ixbots/CMakeLists.txt new file mode 100644 index 00000000..41300a1a --- /dev/null +++ b/ixbots/CMakeLists.txt @@ -0,0 +1,45 @@ +# +# Author: Benjamin Sergeant +# Copyright (c) 2019 Machine Zone, Inc. All rights reserved. +# + +set (IXBOTS_SOURCES + ixbots/IXCobraToSentryBot.cpp + ixbots/IXCobraToStatsdBot.cpp + ixbots/IXQueueManager.cpp +) + +set (IXBOTS_HEADERS + ixbots/IXCobraToSentryBot.h + ixbots/IXCobraToStatsdBot.h + ixbots/IXQueueManager.h +) + +add_library(ixbots STATIC + ${IXBOTS_SOURCES} + ${IXBOTS_HEADERS} +) + +find_package(JsonCpp) +if (NOT JSONCPP_FOUND) + set(JSONCPP_INCLUDE_DIRS ../third_party/jsoncpp) +endif() + +find_package(SpdLog) +if (NOT SPDLOG_FOUND) + set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include) +endif() + +set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src) + +set(IXBOTS_INCLUDE_DIRS + . + .. + ../ixcore + ../ixcobra + ../ixsentry + ${JSONCPP_INCLUDE_DIRS} + ${SPDLOG_INCLUDE_DIRS} + ${STATSD_CLIENT_INCLUDE_DIRS}) + +target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} ) diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp new file mode 100644 index 00000000..99b408c5 --- /dev/null +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -0,0 +1,261 @@ +/* + * IXCobraToSentryBot.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXCobraToSentryBot.h" +#include "IXQueueManager.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + int cobra_to_sentry_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& dsn, + bool verbose, + bool strict, + int jobs, + size_t maxQueueSize) + { + ix::CobraConnection conn; + conn.configure(config); + conn.connect(); + + Json::FastWriter jsonWriter; + std::atomic sentCount(0); + std::atomic receivedCount(0); + std::atomic errorSending(false); + std::atomic stop(false); + std::atomic throttled(false); + + QueueManager queueManager(maxQueueSize, stop); + + auto timer = [&sentCount, &receivedCount] { + while (true) + { + spdlog::info("messages received {} sent {}", receivedCount, sentCount); + + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t1(timer); + + auto heartbeat = [&sentCount, &receivedCount] { + std::string state("na"); + + while (true) + { + std::stringstream ss; + ss << "messages received " << receivedCount; + ss << "messages sent " << sentCount; + + std::string currentState = ss.str(); + + if (currentState == state) + { + spdlog::error("no messages received or sent for 1 minute, exiting"); + exit(1); + } + state = currentState; + + auto duration = std::chrono::minutes(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t2(heartbeat); + + auto sentrySender = + [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] { + SentryClient sentryClient(dsn); + + while (true) + { + Json::Value msg = queueManager.pop(); + + if (msg.isNull()) continue; + if (stop) return; + + auto ret = sentryClient.send(msg, verbose); + HttpResponsePtr response = ret.first; + + if (!response) + { + spdlog::warn("Null HTTP Response"); + continue; + } + + if (verbose) + { + for (auto it : response->headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + + spdlog::info("Upload size: {}", response->uploadSize); + spdlog::info("Download size: {}", response->downloadSize); + + spdlog::info("Status: {}", response->statusCode); + if (response->errorCode != HttpErrorCode::Ok) + { + spdlog::info("error message: {}", response->errorMsg); + } + + if (response->headers["Content-Type"] != "application/octet-stream") + { + spdlog::info("payload: {}", response->payload); + } + } + + if (response->statusCode != 200) + { + spdlog::error("Error sending data to sentry: {}", response->statusCode); + spdlog::error("Body: {}", ret.second); + spdlog::error("Response: {}", response->payload); + errorSending = true; + + // Error 429 Too Many Requests + if (response->statusCode == 429) + { + auto retryAfter = response->headers["Retry-After"]; + std::stringstream ss; + ss << retryAfter; + int seconds; + ss >> seconds; + + if (!ss.eof() || ss.fail()) + { + seconds = 30; + spdlog::warn("Error parsing Retry-After header. " + "Using {} for the sleep duration", + seconds); + } + + spdlog::warn("Error 429 - Too Many Requests. ws will sleep " + "and retry after {} seconds", + retryAfter); + + throttled = true; + auto duration = std::chrono::seconds(seconds); + std::this_thread::sleep_for(duration); + throttled = false; + } + } + else + { + ++sentCount; + } + + if (stop) return; + } + }; + + // Create a thread pool + spdlog::info("Starting {} sentry sender jobs", jobs); + std::vector pool; + for (int i = 0; i < jobs; i++) + { + pool.push_back(std::thread(sentrySender)); + } + + conn.setEventCallback([&conn, + &channel, + &filter, + &jsonWriter, + verbose, + &throttled, + &receivedCount, + &queueManager](ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { + if (eventType == ix::CobraConnection_EventType_Open) + { + spdlog::info("Subscriber connected"); + + for (auto it : headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + if (eventType == ix::CobraConnection_EventType_Closed) + { + spdlog::info("Subscriber closed"); + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + spdlog::info("Subscriber authenticated"); + conn.subscribe(channel, + filter, + [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( + const Json::Value& msg) { + if (verbose) + { + spdlog::info(jsonWriter.write(msg)); + } + + // If we cannot send to sentry fast enough, drop the message + if (throttled) + { + return; + } + + ++receivedCount; + queueManager.add(msg); + }); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + spdlog::error("Subscriber: error {}", errMsg); + } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } + else if (eventType == ix::CobraConnection_EventType_Pong) + { + spdlog::info("Received websocket pong"); + } + }); + + while (true) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + + if (strict && errorSending) break; + } + + conn.disconnect(); + + // join all the bg threads and stop them. + stop = true; + for (int i = 0; i < jobs; i++) + { + spdlog::error("joining thread {}", i); + pool[i].join(); + } + + return (strict && errorSending) ? 1 : 0; + } +} // namespace ix diff --git a/ixbots/ixbots/IXCobraToSentryBot.h b/ixbots/ixbots/IXCobraToSentryBot.h new file mode 100644 index 00000000..ce37d8ab --- /dev/null +++ b/ixbots/ixbots/IXCobraToSentryBot.h @@ -0,0 +1,21 @@ +/* + * IXCobraToSentryBot.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ +#pragma once + +#include +#include + +namespace ix +{ + int cobra_to_sentry_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& dsn, + bool verbose, + bool strict, + int jobs, + size_t maxQueueSize); +} // namespace ix diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp new file mode 100644 index 00000000..17f8e985 --- /dev/null +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -0,0 +1,223 @@ +/* + * IXCobraToStatsdBot.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXCobraToStatsdBot.h" +#include "IXQueueManager.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +namespace ix +{ + // fields are command line argument that can be specified multiple times + std::vector parseFields(const std::string& fields) + { + std::vector tokens; + + // Split by \n + std::string token; + std::stringstream tokenStream(fields); + + while (std::getline(tokenStream, token)) + { + tokens.push_back(token); + } + + return tokens; + } + + // + // Extract an attribute from a Json Value. + // extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz + // + std::string extractAttr(const std::string& attr, const Json::Value& jsonValue) + { + // Split by . + std::string token; + std::stringstream tokenStream(attr); + + Json::Value val(jsonValue); + + while (std::getline(tokenStream, token, '.')) + { + val = val[token]; + } + + return val.asString(); + } + + int cobra_to_statsd_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& host, + int port, + const std::string& prefix, + const std::string& fields, + bool verbose) + { + ix::CobraConnection conn; + conn.configure(config); + conn.connect(); + + auto tokens = parseFields(fields); + + Json::FastWriter jsonWriter; + std::atomic sentCount(0); + std::atomic receivedCount(0); + std::atomic stop(false); + + size_t maxQueueSize = 1000; + QueueManager queueManager(maxQueueSize, stop); + + auto timer = [&sentCount, &receivedCount] { + while (true) + { + spdlog::info("messages received {} sent {}", receivedCount, sentCount); + + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t1(timer); + + auto heartbeat = [&sentCount, &receivedCount] { + std::string state("na"); + + while (true) + { + std::stringstream ss; + ss << "messages received " << receivedCount; + ss << "messages sent " << sentCount; + + std::string currentState = ss.str(); + + if (currentState == state) + { + spdlog::error("no messages received or sent for 1 minute, exiting"); + exit(1); + } + state = currentState; + + auto duration = std::chrono::minutes(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t2(heartbeat); + + auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] { + // statsd client + // test with netcat as a server: `nc -ul 8125` + bool statsdBatch = true; +#ifndef _WIN32 + statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); +#else + int statsdClient; +#endif + while (true) + { + Json::Value msg = queueManager.pop(); + + if (msg.isNull()) continue; + if (stop) return; + + std::string id; + for (auto&& attr : tokens) + { + id += "."; + id += extractAttr(attr, msg); + } + + sentCount += 1; + +#ifndef _WIN32 + statsdClient.count(id, 1); +#endif + } + }; + + std::thread t3(statsdSender); + + conn.setEventCallback( + [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( + ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { + if (eventType == ix::CobraConnection_EventType_Open) + { + spdlog::info("Subscriber connected"); + + for (auto it : headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + if (eventType == ix::CobraConnection_EventType_Closed) + { + spdlog::info("Subscriber closed"); + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + spdlog::info("Subscriber authenticated"); + conn.subscribe(channel, + filter, + [&jsonWriter, &queueManager, verbose, &receivedCount]( + const Json::Value& msg) { + if (verbose) + { + spdlog::info(jsonWriter.write(msg)); + } + + receivedCount++; + + ++receivedCount; + queueManager.add(msg); + }); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + spdlog::error("Subscriber: error {}", errMsg); + } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } + else if (eventType == ix::CobraConnection_EventType_Pong) + { + spdlog::info("Received websocket pong"); + } + }); + + while (true) + { + std::chrono::duration duration(1000); + std::this_thread::sleep_for(duration); + } + + return 0; + } +} // namespace ix diff --git a/ixbots/ixbots/IXCobraToStatsdBot.h b/ixbots/ixbots/IXCobraToStatsdBot.h new file mode 100644 index 00000000..a6a1bcc2 --- /dev/null +++ b/ixbots/ixbots/IXCobraToStatsdBot.h @@ -0,0 +1,22 @@ +/* + * IXCobraToStatsdBot.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ +#pragma once + +#include +#include +#include + +namespace ix +{ + int cobra_to_statsd_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& dsn, + bool verbose, + bool strict, + int jobs, + size_t maxQueueSize); +} // namespace ix diff --git a/ixbots/ixbots/IXQueueManager.cpp b/ixbots/ixbots/IXQueueManager.cpp new file mode 100644 index 00000000..8873d78e --- /dev/null +++ b/ixbots/ixbots/IXQueueManager.cpp @@ -0,0 +1,64 @@ +/* + * IXQueueManager.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXQueueManager.h" +#include + +namespace ix +{ + Json::Value QueueManager::pop() + { + std::unique_lock lock(_mutex); + + if (_queues.empty()) + { + Json::Value val; + return val; + } + + std::vector games; + for (auto it : _queues) + { + games.push_back(it.first); + } + + std::random_shuffle(games.begin(), games.end()); + std::string game = games[0]; + + _condition.wait(lock, [this] { return !_stop; }); + + if (_queues[game].empty()) + { + Json::Value val; + return val; + } + + auto msg = _queues[game].front(); + _queues[game].pop(); + return msg; + } + + void QueueManager::add(Json::Value msg) + { + std::unique_lock lock(_mutex); + + std::string game; + if (msg.isMember("device") && msg["device"].isMember("game")) + { + game = msg["device"]["game"].asString(); + } + + if (game.empty()) return; + + // if the sending is not fast enough there is no point + // in queuing too many events. + if (_queues[game].size() < _maxQueueSize) + { + _queues[game].push(msg); + _condition.notify_one(); + } + } +} diff --git a/ixbots/ixbots/IXQueueManager.h b/ixbots/ixbots/IXQueueManager.h new file mode 100644 index 00000000..0a3cbdd1 --- /dev/null +++ b/ixbots/ixbots/IXQueueManager.h @@ -0,0 +1,38 @@ +/* + * IXQueueManager.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + class QueueManager + { + public: + QueueManager(size_t maxQueueSize, std::atomic& stop) + : _maxQueueSize(maxQueueSize) + , _stop(stop) + { + } + + Json::Value pop(); + void add(Json::Value msg); + + private: + std::map> _queues; + std::mutex _mutex; + std::condition_variable _condition; + size_t _maxQueueSize; + std::atomic& _stop; + }; +} diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index c777a6de..058af9b2 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -74,6 +74,7 @@ target_link_libraries(ws ixwebsocket) target_link_libraries(ws ixcrypto) target_link_libraries(ws ixcore) target_link_libraries(ws ixsentry) +target_link_libraries(ws ixbots) target_link_libraries(ws spdlog) diff --git a/ws/ws.cpp b/ws/ws.cpp index 89377768..9c711fc7 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -428,48 +428,29 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main( - cobraConfig, channel, filter, quiet, fluentd); + ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd); } else if (app.got_subcommand("cobra_publish")) { - ret = ix::ws_cobra_publish_main( - cobraConfig, channel, path); + ret = ix::ws_cobra_publish_main(cobraConfig, channel, path); } else if (app.got_subcommand("cobra_metrics_publish")) { - ret = ix::ws_cobra_metrics_publish_main( - cobraConfig, channel, path, stress); + ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress); } else if (app.got_subcommand("cobra_to_statsd")) { - ret = ix::ws_cobra_to_statsd_main(cobraConfig, - channel, - filter, - hostname, - statsdPort, - prefix, - fields, - verbose); + ret = ix::ws_cobra_to_statsd_main( + cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose); } else if (app.got_subcommand("cobra_to_sentry")) { - ret = ix::ws_cobra_to_sentry_main(cobraConfig, - channel, - filter, - dsn, - verbose, - strict, - jobs, - maxQueueSize); + ret = ix::ws_cobra_to_sentry_main( + cobraConfig, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); } else if (app.got_subcommand("cobra_metrics_to_redis")) { - ret = ix::ws_cobra_metrics_to_redis(cobraConfig, - channel, - filter, - hostname, - redisPort); + ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort); } else if (app.got_subcommand("snake")) { diff --git a/ws/ws.h b/ws/ws.h index 1d129d85..027e64b4 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -5,8 +5,8 @@ */ #pragma once -#include #include +#include #include namespace ix diff --git a/ws/ws_cobra_metrics_publish.cpp b/ws/ws_cobra_metrics_publish.cpp index f289e02a..790329fd 100644 --- a/ws/ws_cobra_metrics_publish.cpp +++ b/ws/ws_cobra_metrics_publish.cpp @@ -32,8 +32,13 @@ namespace ix cobraMetricsPublisher.enable(true); bool enablePerMessageDeflate = true; - cobraMetricsPublisher.configure( - config.appkey, config.endpoint, channel, config.rolename, config.rolesecret, enablePerMessageDeflate, config.socketTLSOptions); + cobraMetricsPublisher.configure(config.appkey, + config.endpoint, + channel, + config.rolename, + config.rolesecret, + enablePerMessageDeflate, + config.socketTLSOptions); while (!cobraMetricsPublisher.isAuthenticated()) ; diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index 86b25943..87887f71 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -4,94 +4,10 @@ * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include namespace ix { - class QueueManager - { - public: - QueueManager(size_t maxQueueSize, std::atomic& stop) - : _maxQueueSize(maxQueueSize) - , _stop(stop) - { - } - - Json::Value pop(); - void add(Json::Value msg); - - private: - std::map> _queues; - std::mutex _mutex; - std::condition_variable _condition; - size_t _maxQueueSize; - std::atomic& _stop; - }; - - Json::Value QueueManager::pop() - { - std::unique_lock lock(_mutex); - - if (_queues.empty()) - { - Json::Value val; - return val; - } - - std::vector games; - for (auto it : _queues) - { - games.push_back(it.first); - } - - std::random_shuffle(games.begin(), games.end()); - std::string game = games[0]; - - _condition.wait(lock, [this] { return !_stop; }); - - if (_queues[game].empty()) - { - Json::Value val; - return val; - } - - auto msg = _queues[game].front(); - _queues[game].pop(); - return msg; - } - - void QueueManager::add(Json::Value msg) - { - std::unique_lock lock(_mutex); - - std::string game; - if (msg.isMember("device") && msg["device"].isMember("game")) - { - game = msg["device"]["game"].asString(); - } - - if (game.empty()) return; - - // if the sending is not fast enough there is no point - // in queuing too many events. - if (_queues[game].size() < _maxQueueSize) - { - _queues[game].push(msg); - _condition.notify_one(); - } - } - int ws_cobra_to_sentry_main(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, @@ -101,236 +17,7 @@ namespace ix int jobs, size_t maxQueueSize) { - ix::CobraConnection conn; - conn.configure(config); - conn.connect(); - - Json::FastWriter jsonWriter; - std::atomic sentCount(0); - std::atomic receivedCount(0); - std::atomic errorSending(false); - std::atomic stop(false); - std::atomic throttled(false); - - QueueManager queueManager(maxQueueSize, stop); - - auto timer = [&sentCount, &receivedCount] { - while (true) - { - spdlog::info("messages received {} sent {}", receivedCount, sentCount); - - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t1(timer); - - auto heartbeat = [&sentCount, &receivedCount] { - std::string state("na"); - - while (true) - { - std::stringstream ss; - ss << "messages received " << receivedCount; - ss << "messages sent " << sentCount; - - std::string currentState = ss.str(); - - if (currentState == state) - { - spdlog::error("no messages received or sent for 1 minute, exiting"); - exit(1); - } - state = currentState; - - auto duration = std::chrono::minutes(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t2(heartbeat); - - auto sentrySender = - [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] { - SentryClient sentryClient(dsn); - - while (true) - { - Json::Value msg = queueManager.pop(); - - if (msg.isNull()) continue; - if (stop) return; - - auto ret = sentryClient.send(msg, verbose); - HttpResponsePtr response = ret.first; - - if (!response) - { - spdlog::warn("Null HTTP Response"); - continue; - } - - if (verbose) - { - for (auto it : response->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - - spdlog::info("Upload size: {}", response->uploadSize); - spdlog::info("Download size: {}", response->downloadSize); - - spdlog::info("Status: {}", response->statusCode); - if (response->errorCode != HttpErrorCode::Ok) - { - spdlog::info("error message: {}", response->errorMsg); - } - - if (response->headers["Content-Type"] != "application/octet-stream") - { - spdlog::info("payload: {}", response->payload); - } - } - - if (response->statusCode != 200) - { - spdlog::error("Error sending data to sentry: {}", response->statusCode); - spdlog::error("Body: {}", ret.second); - spdlog::error("Response: {}", response->payload); - errorSending = true; - - // Error 429 Too Many Requests - if (response->statusCode == 429) - { - auto retryAfter = response->headers["Retry-After"]; - std::stringstream ss; - ss << retryAfter; - int seconds; - ss >> seconds; - - if (!ss.eof() || ss.fail()) - { - seconds = 30; - spdlog::warn("Error parsing Retry-After header. " - "Using {} for the sleep duration", - seconds); - } - - spdlog::warn("Error 429 - Too Many Requests. ws will sleep " - "and retry after {} seconds", - retryAfter); - - throttled = true; - auto duration = std::chrono::seconds(seconds); - std::this_thread::sleep_for(duration); - throttled = false; - } - } - else - { - ++sentCount; - } - - if (stop) return; - } - }; - - // Create a thread pool - spdlog::info("Starting {} sentry sender jobs", jobs); - std::vector pool; - for (int i = 0; i < jobs; i++) - { - pool.push_back(std::thread(sentrySender)); - } - - conn.setEventCallback([&conn, - &channel, - &filter, - &jsonWriter, - verbose, - &throttled, - &receivedCount, - &queueManager](ix::CobraConnectionEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraConnection_EventType_Open) - { - spdlog::info("Subscriber connected"); - - for (auto it : headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - if (eventType == ix::CobraConnection_EventType_Closed) - { - spdlog::info("Subscriber closed"); - } - else if (eventType == ix::CobraConnection_EventType_Authenticated) - { - spdlog::info("Subscriber authenticated"); - conn.subscribe(channel, - filter, - [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( - const Json::Value& msg) { - if (verbose) - { - spdlog::info(jsonWriter.write(msg)); - } - - // If we cannot send to sentry fast enough, drop the message - if (throttled) - { - return; - } - - ++receivedCount; - queueManager.add(msg); - }); - } - else if (eventType == ix::CobraConnection_EventType_Subscribed) - { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); - } - else if (eventType == ix::CobraConnection_EventType_UnSubscribed) - { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); - } - else if (eventType == ix::CobraConnection_EventType_Error) - { - spdlog::error("Subscriber: error {}", errMsg); - } - else if (eventType == ix::CobraConnection_EventType_Published) - { - spdlog::error("Published message hacked: {}", msgId); - } - else if (eventType == ix::CobraConnection_EventType_Pong) - { - spdlog::info("Received websocket pong"); - } - }); - - while (true) - { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - - if (strict && errorSending) break; - } - - conn.disconnect(); - - // join all the bg threads and stop them. - stop = true; - for (int i = 0; i < jobs; i++) - { - spdlog::error("joining thread {}", i); - pool[i].join(); - } - - return (strict && errorSending) ? 1 : 0; + return cobra_to_sentry_bot( + config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); } } // namespace ix