move sentry and statsd cobra ws commands into a new ixbots folder
This commit is contained in:
		
							
								
								
									
										19
									
								
								CMake/FindSpdlog.cmake
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								CMake/FindSpdlog.cmake
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | |||||||
|  | # Find package structure taken from libcurl | ||||||
|  |  | ||||||
|  | include(FindPackageHandleStandardArgs) | ||||||
|  |  | ||||||
|  | find_path(SPDLOG_INCLUDE_DIRS spdlog/spdlog.h) | ||||||
|  | find_library(JSONCPP_LIBRARY spdlog) | ||||||
|  |  | ||||||
|  | find_package_handle_standard_args(SPDLOG | ||||||
|  |     FOUND_VAR | ||||||
|  |     SPDLOG_FOUND | ||||||
|  |     REQUIRED_VARS | ||||||
|  |       SPDLOG_LIBRARY | ||||||
|  |       SPDLOG_INCLUDE_DIRS | ||||||
|  |     FAIL_MESSAGE | ||||||
|  |       "Could NOT find spdlog" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | set(SPDLOG_INCLUDE_DIRS ${SPDLOG_INCLUDE_DIRS}) | ||||||
|  | set(SPDLOG_LIBRARIES ${SPDLOG_LIBRARY}) | ||||||
| @@ -230,6 +230,7 @@ if (USE_WS OR USE_TEST) | |||||||
|   add_subdirectory(ixcobra) |   add_subdirectory(ixcobra) | ||||||
|   add_subdirectory(ixsnake) |   add_subdirectory(ixsnake) | ||||||
|   add_subdirectory(ixsentry) |   add_subdirectory(ixsentry) | ||||||
|  |   add_subdirectory(ixbots) | ||||||
|  |  | ||||||
|   add_subdirectory(third_party/spdlog spdlog) |   add_subdirectory(third_party/spdlog spdlog) | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										45
									
								
								ixbots/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								ixbots/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,45 @@ | |||||||
|  | # | ||||||
|  | # Author: Benjamin Sergeant | ||||||
|  | # Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  | # | ||||||
|  |  | ||||||
|  | set (IXBOTS_SOURCES | ||||||
|  |     ixbots/IXCobraToSentryBot.cpp | ||||||
|  |     ixbots/IXCobraToStatsdBot.cpp | ||||||
|  |     ixbots/IXQueueManager.cpp | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | set (IXBOTS_HEADERS | ||||||
|  |     ixbots/IXCobraToSentryBot.h | ||||||
|  |     ixbots/IXCobraToStatsdBot.h | ||||||
|  |     ixbots/IXQueueManager.h | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | add_library(ixbots STATIC | ||||||
|  |     ${IXBOTS_SOURCES} | ||||||
|  |     ${IXBOTS_HEADERS} | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | find_package(JsonCpp) | ||||||
|  | if (NOT JSONCPP_FOUND) | ||||||
|  |   set(JSONCPP_INCLUDE_DIRS ../third_party/jsoncpp) | ||||||
|  | endif() | ||||||
|  |  | ||||||
|  | find_package(SpdLog) | ||||||
|  | if (NOT SPDLOG_FOUND) | ||||||
|  |   set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include) | ||||||
|  | endif() | ||||||
|  |  | ||||||
|  | set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src) | ||||||
|  |  | ||||||
|  | set(IXBOTS_INCLUDE_DIRS | ||||||
|  |     . | ||||||
|  |     .. | ||||||
|  |     ../ixcore | ||||||
|  |     ../ixcobra | ||||||
|  |     ../ixsentry | ||||||
|  |     ${JSONCPP_INCLUDE_DIRS} | ||||||
|  |     ${SPDLOG_INCLUDE_DIRS} | ||||||
|  |     ${STATSD_CLIENT_INCLUDE_DIRS}) | ||||||
|  |  | ||||||
|  | target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} ) | ||||||
							
								
								
									
										261
									
								
								ixbots/ixbots/IXCobraToSentryBot.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										261
									
								
								ixbots/ixbots/IXCobraToSentryBot.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,261 @@ | |||||||
|  | /* | ||||||
|  |  *  IXCobraToSentryBot.cpp | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #include "IXCobraToSentryBot.h" | ||||||
|  | #include "IXQueueManager.h" | ||||||
|  |  | ||||||
|  | #include <chrono> | ||||||
|  | #include <ixcobra/IXCobraConnection.h> | ||||||
|  | #include <ixsentry/IXSentryClient.h> | ||||||
|  | #include <spdlog/spdlog.h> | ||||||
|  | #include <sstream> | ||||||
|  | #include <thread> | ||||||
|  | #include <vector> | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     int cobra_to_sentry_bot(const ix::CobraConfig& config, | ||||||
|  |                             const std::string& channel, | ||||||
|  |                             const std::string& filter, | ||||||
|  |                             const std::string& dsn, | ||||||
|  |                             bool verbose, | ||||||
|  |                             bool strict, | ||||||
|  |                             int jobs, | ||||||
|  |                             size_t maxQueueSize) | ||||||
|  |     { | ||||||
|  |         ix::CobraConnection conn; | ||||||
|  |         conn.configure(config); | ||||||
|  |         conn.connect(); | ||||||
|  |  | ||||||
|  |         Json::FastWriter jsonWriter; | ||||||
|  |         std::atomic<uint64_t> sentCount(0); | ||||||
|  |         std::atomic<uint64_t> receivedCount(0); | ||||||
|  |         std::atomic<bool> errorSending(false); | ||||||
|  |         std::atomic<bool> stop(false); | ||||||
|  |         std::atomic<bool> throttled(false); | ||||||
|  |  | ||||||
|  |         QueueManager queueManager(maxQueueSize, stop); | ||||||
|  |  | ||||||
|  |         auto timer = [&sentCount, &receivedCount] { | ||||||
|  |             while (true) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("messages received {} sent {}", receivedCount, sentCount); | ||||||
|  |  | ||||||
|  |                 auto duration = std::chrono::seconds(1); | ||||||
|  |                 std::this_thread::sleep_for(duration); | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         std::thread t1(timer); | ||||||
|  |  | ||||||
|  |         auto heartbeat = [&sentCount, &receivedCount] { | ||||||
|  |             std::string state("na"); | ||||||
|  |  | ||||||
|  |             while (true) | ||||||
|  |             { | ||||||
|  |                 std::stringstream ss; | ||||||
|  |                 ss << "messages received " << receivedCount; | ||||||
|  |                 ss << "messages sent " << sentCount; | ||||||
|  |  | ||||||
|  |                 std::string currentState = ss.str(); | ||||||
|  |  | ||||||
|  |                 if (currentState == state) | ||||||
|  |                 { | ||||||
|  |                     spdlog::error("no messages received or sent for 1 minute, exiting"); | ||||||
|  |                     exit(1); | ||||||
|  |                 } | ||||||
|  |                 state = currentState; | ||||||
|  |  | ||||||
|  |                 auto duration = std::chrono::minutes(1); | ||||||
|  |                 std::this_thread::sleep_for(duration); | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         std::thread t2(heartbeat); | ||||||
|  |  | ||||||
|  |         auto sentrySender = | ||||||
|  |             [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] { | ||||||
|  |                 SentryClient sentryClient(dsn); | ||||||
|  |  | ||||||
|  |                 while (true) | ||||||
|  |                 { | ||||||
|  |                     Json::Value msg = queueManager.pop(); | ||||||
|  |  | ||||||
|  |                     if (msg.isNull()) continue; | ||||||
|  |                     if (stop) return; | ||||||
|  |  | ||||||
|  |                     auto ret = sentryClient.send(msg, verbose); | ||||||
|  |                     HttpResponsePtr response = ret.first; | ||||||
|  |  | ||||||
|  |                     if (!response) | ||||||
|  |                     { | ||||||
|  |                         spdlog::warn("Null HTTP Response"); | ||||||
|  |                         continue; | ||||||
|  |                     } | ||||||
|  |  | ||||||
|  |                     if (verbose) | ||||||
|  |                     { | ||||||
|  |                         for (auto it : response->headers) | ||||||
|  |                         { | ||||||
|  |                             spdlog::info("{}: {}", it.first, it.second); | ||||||
|  |                         } | ||||||
|  |  | ||||||
|  |                         spdlog::info("Upload size: {}", response->uploadSize); | ||||||
|  |                         spdlog::info("Download size: {}", response->downloadSize); | ||||||
|  |  | ||||||
|  |                         spdlog::info("Status: {}", response->statusCode); | ||||||
|  |                         if (response->errorCode != HttpErrorCode::Ok) | ||||||
|  |                         { | ||||||
|  |                             spdlog::info("error message: {}", response->errorMsg); | ||||||
|  |                         } | ||||||
|  |  | ||||||
|  |                         if (response->headers["Content-Type"] != "application/octet-stream") | ||||||
|  |                         { | ||||||
|  |                             spdlog::info("payload: {}", response->payload); | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |  | ||||||
|  |                     if (response->statusCode != 200) | ||||||
|  |                     { | ||||||
|  |                         spdlog::error("Error sending data to sentry: {}", response->statusCode); | ||||||
|  |                         spdlog::error("Body: {}", ret.second); | ||||||
|  |                         spdlog::error("Response: {}", response->payload); | ||||||
|  |                         errorSending = true; | ||||||
|  |  | ||||||
|  |                         // Error 429 Too Many Requests | ||||||
|  |                         if (response->statusCode == 429) | ||||||
|  |                         { | ||||||
|  |                             auto retryAfter = response->headers["Retry-After"]; | ||||||
|  |                             std::stringstream ss; | ||||||
|  |                             ss << retryAfter; | ||||||
|  |                             int seconds; | ||||||
|  |                             ss >> seconds; | ||||||
|  |  | ||||||
|  |                             if (!ss.eof() || ss.fail()) | ||||||
|  |                             { | ||||||
|  |                                 seconds = 30; | ||||||
|  |                                 spdlog::warn("Error parsing Retry-After header. " | ||||||
|  |                                              "Using {} for the sleep duration", | ||||||
|  |                                              seconds); | ||||||
|  |                             } | ||||||
|  |  | ||||||
|  |                             spdlog::warn("Error 429 - Too Many Requests. ws will sleep " | ||||||
|  |                                          "and retry after {} seconds", | ||||||
|  |                                          retryAfter); | ||||||
|  |  | ||||||
|  |                             throttled = true; | ||||||
|  |                             auto duration = std::chrono::seconds(seconds); | ||||||
|  |                             std::this_thread::sleep_for(duration); | ||||||
|  |                             throttled = false; | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                     else | ||||||
|  |                     { | ||||||
|  |                         ++sentCount; | ||||||
|  |                     } | ||||||
|  |  | ||||||
|  |                     if (stop) return; | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |  | ||||||
|  |         // Create a thread pool | ||||||
|  |         spdlog::info("Starting {} sentry sender jobs", jobs); | ||||||
|  |         std::vector<std::thread> pool; | ||||||
|  |         for (int i = 0; i < jobs; i++) | ||||||
|  |         { | ||||||
|  |             pool.push_back(std::thread(sentrySender)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         conn.setEventCallback([&conn, | ||||||
|  |                                &channel, | ||||||
|  |                                &filter, | ||||||
|  |                                &jsonWriter, | ||||||
|  |                                verbose, | ||||||
|  |                                &throttled, | ||||||
|  |                                &receivedCount, | ||||||
|  |                                &queueManager](ix::CobraConnectionEventType eventType, | ||||||
|  |                                               const std::string& errMsg, | ||||||
|  |                                               const ix::WebSocketHttpHeaders& headers, | ||||||
|  |                                               const std::string& subscriptionId, | ||||||
|  |                                               CobraConnection::MsgId msgId) { | ||||||
|  |             if (eventType == ix::CobraConnection_EventType_Open) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Subscriber connected"); | ||||||
|  |  | ||||||
|  |                 for (auto it : headers) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("{}: {}", it.first, it.second); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             if (eventType == ix::CobraConnection_EventType_Closed) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Subscriber closed"); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_Authenticated) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Subscriber authenticated"); | ||||||
|  |                 conn.subscribe(channel, | ||||||
|  |                                filter, | ||||||
|  |                                [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( | ||||||
|  |                                    const Json::Value& msg) { | ||||||
|  |                                    if (verbose) | ||||||
|  |                                    { | ||||||
|  |                                        spdlog::info(jsonWriter.write(msg)); | ||||||
|  |                                    } | ||||||
|  |  | ||||||
|  |                                    // If we cannot send to sentry fast enough, drop the message | ||||||
|  |                                    if (throttled) | ||||||
|  |                                    { | ||||||
|  |                                        return; | ||||||
|  |                                    } | ||||||
|  |  | ||||||
|  |                                    ++receivedCount; | ||||||
|  |                                    queueManager.add(msg); | ||||||
|  |                                }); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_Subscribed) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_UnSubscribed) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_Error) | ||||||
|  |             { | ||||||
|  |                 spdlog::error("Subscriber: error {}", errMsg); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_Published) | ||||||
|  |             { | ||||||
|  |                 spdlog::error("Published message hacked: {}", msgId); | ||||||
|  |             } | ||||||
|  |             else if (eventType == ix::CobraConnection_EventType_Pong) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("Received websocket pong"); | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         while (true) | ||||||
|  |         { | ||||||
|  |             auto duration = std::chrono::seconds(1); | ||||||
|  |             std::this_thread::sleep_for(duration); | ||||||
|  |  | ||||||
|  |             if (strict && errorSending) break; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         conn.disconnect(); | ||||||
|  |  | ||||||
|  |         // join all the bg threads and stop them. | ||||||
|  |         stop = true; | ||||||
|  |         for (int i = 0; i < jobs; i++) | ||||||
|  |         { | ||||||
|  |             spdlog::error("joining thread {}", i); | ||||||
|  |             pool[i].join(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return (strict && errorSending) ? 1 : 0; | ||||||
|  |     } | ||||||
|  | } // namespace ix | ||||||
							
								
								
									
										21
									
								
								ixbots/ixbots/IXCobraToSentryBot.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								ixbots/ixbots/IXCobraToSentryBot.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | |||||||
|  | /* | ||||||
|  |  *  IXCobraToSentryBot.h | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  | #pragma once | ||||||
|  |  | ||||||
|  | #include <ixcobra/IXCobraConfig.h> | ||||||
|  | #include <string> | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     int cobra_to_sentry_bot(const ix::CobraConfig& config, | ||||||
|  |                             const std::string& channel, | ||||||
|  |                             const std::string& filter, | ||||||
|  |                             const std::string& dsn, | ||||||
|  |                             bool verbose, | ||||||
|  |                             bool strict, | ||||||
|  |                             int jobs, | ||||||
|  |                             size_t maxQueueSize); | ||||||
|  | } // namespace ix | ||||||
							
								
								
									
										223
									
								
								ixbots/ixbots/IXCobraToStatsdBot.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										223
									
								
								ixbots/ixbots/IXCobraToStatsdBot.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,223 @@ | |||||||
|  | /* | ||||||
|  |  *  IXCobraToStatsdBot.cpp | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #include "IXCobraToStatsdBot.h" | ||||||
|  | #include "IXQueueManager.h" | ||||||
|  |  | ||||||
|  | #include <atomic> | ||||||
|  | #include <chrono> | ||||||
|  | #include <condition_variable> | ||||||
|  | #include <ixcobra/IXCobraConnection.h> | ||||||
|  | #include <spdlog/spdlog.h> | ||||||
|  | #include <sstream> | ||||||
|  | #include <thread> | ||||||
|  | #include <vector> | ||||||
|  |  | ||||||
|  | #ifndef _WIN32 | ||||||
|  | #include <statsd_client.h> | ||||||
|  | #endif | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     // fields are command line argument that can be specified multiple times | ||||||
|  |     std::vector<std::string> parseFields(const std::string& fields) | ||||||
|  |     { | ||||||
|  |         std::vector<std::string> tokens; | ||||||
|  |  | ||||||
|  |         // Split by \n | ||||||
|  |         std::string token; | ||||||
|  |         std::stringstream tokenStream(fields); | ||||||
|  |  | ||||||
|  |         while (std::getline(tokenStream, token)) | ||||||
|  |         { | ||||||
|  |             tokens.push_back(token); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return tokens; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // | ||||||
|  |     // Extract an attribute from a Json Value. | ||||||
|  |     // extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz | ||||||
|  |     // | ||||||
|  |     std::string extractAttr(const std::string& attr, const Json::Value& jsonValue) | ||||||
|  |     { | ||||||
|  |         // Split by . | ||||||
|  |         std::string token; | ||||||
|  |         std::stringstream tokenStream(attr); | ||||||
|  |  | ||||||
|  |         Json::Value val(jsonValue); | ||||||
|  |  | ||||||
|  |         while (std::getline(tokenStream, token, '.')) | ||||||
|  |         { | ||||||
|  |             val = val[token]; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return val.asString(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     int cobra_to_statsd_bot(const ix::CobraConfig& config, | ||||||
|  |                             const std::string& channel, | ||||||
|  |                             const std::string& filter, | ||||||
|  |                             const std::string& host, | ||||||
|  |                             int port, | ||||||
|  |                             const std::string& prefix, | ||||||
|  |                             const std::string& fields, | ||||||
|  |                             bool verbose) | ||||||
|  |     { | ||||||
|  |         ix::CobraConnection conn; | ||||||
|  |         conn.configure(config); | ||||||
|  |         conn.connect(); | ||||||
|  |  | ||||||
|  |         auto tokens = parseFields(fields); | ||||||
|  |  | ||||||
|  |         Json::FastWriter jsonWriter; | ||||||
|  |         std::atomic<uint64_t> sentCount(0); | ||||||
|  |         std::atomic<uint64_t> receivedCount(0); | ||||||
|  |         std::atomic<bool> stop(false); | ||||||
|  |  | ||||||
|  |         size_t maxQueueSize = 1000; | ||||||
|  |         QueueManager queueManager(maxQueueSize, stop); | ||||||
|  |  | ||||||
|  |         auto timer = [&sentCount, &receivedCount] { | ||||||
|  |             while (true) | ||||||
|  |             { | ||||||
|  |                 spdlog::info("messages received {} sent {}", receivedCount, sentCount); | ||||||
|  |  | ||||||
|  |                 auto duration = std::chrono::seconds(1); | ||||||
|  |                 std::this_thread::sleep_for(duration); | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         std::thread t1(timer); | ||||||
|  |  | ||||||
|  |         auto heartbeat = [&sentCount, &receivedCount] { | ||||||
|  |             std::string state("na"); | ||||||
|  |  | ||||||
|  |             while (true) | ||||||
|  |             { | ||||||
|  |                 std::stringstream ss; | ||||||
|  |                 ss << "messages received " << receivedCount; | ||||||
|  |                 ss << "messages sent " << sentCount; | ||||||
|  |  | ||||||
|  |                 std::string currentState = ss.str(); | ||||||
|  |  | ||||||
|  |                 if (currentState == state) | ||||||
|  |                 { | ||||||
|  |                     spdlog::error("no messages received or sent for 1 minute, exiting"); | ||||||
|  |                     exit(1); | ||||||
|  |                 } | ||||||
|  |                 state = currentState; | ||||||
|  |  | ||||||
|  |                 auto duration = std::chrono::minutes(1); | ||||||
|  |                 std::this_thread::sleep_for(duration); | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         std::thread t2(heartbeat); | ||||||
|  |  | ||||||
|  |         auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] { | ||||||
|  |             // statsd client | ||||||
|  |             // test with netcat as a server: `nc -ul 8125` | ||||||
|  |             bool statsdBatch = true; | ||||||
|  | #ifndef _WIN32 | ||||||
|  |             statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); | ||||||
|  | #else | ||||||
|  |             int statsdClient; | ||||||
|  | #endif | ||||||
|  |             while (true) | ||||||
|  |             { | ||||||
|  |                 Json::Value msg = queueManager.pop(); | ||||||
|  |  | ||||||
|  |                 if (msg.isNull()) continue; | ||||||
|  |                 if (stop) return; | ||||||
|  |  | ||||||
|  |                 std::string id; | ||||||
|  |                 for (auto&& attr : tokens) | ||||||
|  |                 { | ||||||
|  |                     id += "."; | ||||||
|  |                     id += extractAttr(attr, msg); | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 sentCount += 1; | ||||||
|  |  | ||||||
|  | #ifndef _WIN32 | ||||||
|  |                 statsdClient.count(id, 1); | ||||||
|  | #endif | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         std::thread t3(statsdSender); | ||||||
|  |  | ||||||
|  |         conn.setEventCallback( | ||||||
|  |             [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( | ||||||
|  |                 ix::CobraConnectionEventType eventType, | ||||||
|  |                 const std::string& errMsg, | ||||||
|  |                 const ix::WebSocketHttpHeaders& headers, | ||||||
|  |                 const std::string& subscriptionId, | ||||||
|  |                 CobraConnection::MsgId msgId) { | ||||||
|  |                 if (eventType == ix::CobraConnection_EventType_Open) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Subscriber connected"); | ||||||
|  |  | ||||||
|  |                     for (auto it : headers) | ||||||
|  |                     { | ||||||
|  |                         spdlog::info("{}: {}", it.first, it.second); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 if (eventType == ix::CobraConnection_EventType_Closed) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Subscriber closed"); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Authenticated) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Subscriber authenticated"); | ||||||
|  |                     conn.subscribe(channel, | ||||||
|  |                                    filter, | ||||||
|  |                                    [&jsonWriter, &queueManager, verbose, &receivedCount]( | ||||||
|  |                                        const Json::Value& msg) { | ||||||
|  |                                        if (verbose) | ||||||
|  |                                        { | ||||||
|  |                                            spdlog::info(jsonWriter.write(msg)); | ||||||
|  |                                        } | ||||||
|  |  | ||||||
|  |                                        receivedCount++; | ||||||
|  |  | ||||||
|  |                                        ++receivedCount; | ||||||
|  |                                        queueManager.add(msg); | ||||||
|  |                                    }); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Subscribed) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_UnSubscribed) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Error) | ||||||
|  |                 { | ||||||
|  |                     spdlog::error("Subscriber: error {}", errMsg); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Published) | ||||||
|  |                 { | ||||||
|  |                     spdlog::error("Published message hacked: {}", msgId); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Pong) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Received websocket pong"); | ||||||
|  |                 } | ||||||
|  |             }); | ||||||
|  |  | ||||||
|  |         while (true) | ||||||
|  |         { | ||||||
|  |             std::chrono::duration<double, std::milli> duration(1000); | ||||||
|  |             std::this_thread::sleep_for(duration); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         return 0; | ||||||
|  |     } | ||||||
|  | } // namespace ix | ||||||
							
								
								
									
										22
									
								
								ixbots/ixbots/IXCobraToStatsdBot.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								ixbots/ixbots/IXCobraToStatsdBot.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,22 @@ | |||||||
|  | /* | ||||||
|  |  *  IXCobraToStatsdBot.h | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  | #pragma once | ||||||
|  |  | ||||||
|  | #include <ixcobra/IXCobraConfig.h> | ||||||
|  | #include <string> | ||||||
|  | #include <stddef.h> | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     int cobra_to_statsd_bot(const ix::CobraConfig& config, | ||||||
|  |                             const std::string& channel, | ||||||
|  |                             const std::string& filter, | ||||||
|  |                             const std::string& dsn, | ||||||
|  |                             bool verbose, | ||||||
|  |                             bool strict, | ||||||
|  |                             int jobs, | ||||||
|  |                             size_t maxQueueSize); | ||||||
|  | } // namespace ix | ||||||
							
								
								
									
										64
									
								
								ixbots/ixbots/IXQueueManager.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								ixbots/ixbots/IXQueueManager.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | |||||||
|  | /* | ||||||
|  |  *  IXQueueManager.cpp | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #include "IXQueueManager.h" | ||||||
|  | #include <vector> | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     Json::Value QueueManager::pop() | ||||||
|  |     { | ||||||
|  |         std::unique_lock<std::mutex> lock(_mutex); | ||||||
|  |  | ||||||
|  |         if (_queues.empty()) | ||||||
|  |         { | ||||||
|  |             Json::Value val; | ||||||
|  |             return val; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         std::vector<std::string> games; | ||||||
|  |         for (auto it : _queues) | ||||||
|  |         { | ||||||
|  |             games.push_back(it.first); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         std::random_shuffle(games.begin(), games.end()); | ||||||
|  |         std::string game = games[0]; | ||||||
|  |  | ||||||
|  |         _condition.wait(lock, [this] { return !_stop; }); | ||||||
|  |  | ||||||
|  |         if (_queues[game].empty()) | ||||||
|  |         { | ||||||
|  |             Json::Value val; | ||||||
|  |             return val; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         auto msg = _queues[game].front(); | ||||||
|  |         _queues[game].pop(); | ||||||
|  |         return msg; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     void QueueManager::add(Json::Value msg) | ||||||
|  |     { | ||||||
|  |         std::unique_lock<std::mutex> lock(_mutex); | ||||||
|  |  | ||||||
|  |         std::string game; | ||||||
|  |         if (msg.isMember("device") && msg["device"].isMember("game")) | ||||||
|  |         { | ||||||
|  |             game = msg["device"]["game"].asString(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         if (game.empty()) return; | ||||||
|  |  | ||||||
|  |         // if the sending is not fast enough there is no point | ||||||
|  |         // in queuing too many events. | ||||||
|  |         if (_queues[game].size() < _maxQueueSize) | ||||||
|  |         { | ||||||
|  |             _queues[game].push(msg); | ||||||
|  |             _condition.notify_one(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
							
								
								
									
										38
									
								
								ixbots/ixbots/IXQueueManager.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								ixbots/ixbots/IXQueueManager.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | |||||||
|  | /* | ||||||
|  |  *  IXQueueManager.h | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #pragma once | ||||||
|  |  | ||||||
|  | #include <stddef.h> | ||||||
|  | #include <atomic> | ||||||
|  | #include <json/json.h> | ||||||
|  | #include <mutex> | ||||||
|  | #include <condition_variable> | ||||||
|  | #include <queue> | ||||||
|  | #include <map> | ||||||
|  |  | ||||||
|  | namespace ix | ||||||
|  | { | ||||||
|  |     class QueueManager | ||||||
|  |     { | ||||||
|  |     public: | ||||||
|  |         QueueManager(size_t maxQueueSize, std::atomic<bool>& stop) | ||||||
|  |             : _maxQueueSize(maxQueueSize) | ||||||
|  |             , _stop(stop) | ||||||
|  |         { | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         Json::Value pop(); | ||||||
|  |         void add(Json::Value msg); | ||||||
|  |  | ||||||
|  |     private: | ||||||
|  |         std::map<std::string, std::queue<Json::Value>> _queues; | ||||||
|  |         std::mutex _mutex; | ||||||
|  |         std::condition_variable _condition; | ||||||
|  |         size_t _maxQueueSize; | ||||||
|  |         std::atomic<bool>& _stop; | ||||||
|  |     }; | ||||||
|  | } | ||||||
| @@ -74,6 +74,7 @@ target_link_libraries(ws ixwebsocket) | |||||||
| target_link_libraries(ws ixcrypto) | target_link_libraries(ws ixcrypto) | ||||||
| target_link_libraries(ws ixcore) | target_link_libraries(ws ixcore) | ||||||
| target_link_libraries(ws ixsentry) | target_link_libraries(ws ixsentry) | ||||||
|  | target_link_libraries(ws ixbots) | ||||||
|  |  | ||||||
| target_link_libraries(ws spdlog) | target_link_libraries(ws spdlog) | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										35
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								ws/ws.cpp
									
									
									
									
									
								
							| @@ -428,48 +428,29 @@ int main(int argc, char** argv) | |||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_subscribe")) |     else if (app.got_subcommand("cobra_subscribe")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_subscribe_main( |         ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd); | ||||||
|             cobraConfig, channel, filter, quiet, fluentd); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_publish")) |     else if (app.got_subcommand("cobra_publish")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_publish_main( |         ret = ix::ws_cobra_publish_main(cobraConfig, channel, path); | ||||||
|             cobraConfig, channel, path); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_metrics_publish")) |     else if (app.got_subcommand("cobra_metrics_publish")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_metrics_publish_main( |         ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress); | ||||||
|             cobraConfig, channel, path, stress); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_to_statsd")) |     else if (app.got_subcommand("cobra_to_statsd")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_to_statsd_main(cobraConfig, |         ret = ix::ws_cobra_to_statsd_main( | ||||||
|                                           channel, |             cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose); | ||||||
|                                           filter, |  | ||||||
|                                           hostname, |  | ||||||
|                                           statsdPort, |  | ||||||
|                                           prefix, |  | ||||||
|                                           fields, |  | ||||||
|                                           verbose); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_to_sentry")) |     else if (app.got_subcommand("cobra_to_sentry")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_to_sentry_main(cobraConfig, |         ret = ix::ws_cobra_to_sentry_main( | ||||||
|                                           channel, |             cobraConfig, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); | ||||||
|                                           filter, |  | ||||||
|                                           dsn, |  | ||||||
|                                           verbose, |  | ||||||
|                                           strict, |  | ||||||
|                                           jobs, |  | ||||||
|                                           maxQueueSize); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_metrics_to_redis")) |     else if (app.got_subcommand("cobra_metrics_to_redis")) | ||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_metrics_to_redis(cobraConfig, |         ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort); | ||||||
|                                             channel, |  | ||||||
|                                             filter, |  | ||||||
|                                             hostname, |  | ||||||
|                                             redisPort); |  | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("snake")) |     else if (app.got_subcommand("snake")) | ||||||
|     { |     { | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								ws/ws.h
									
									
									
									
									
								
							| @@ -5,8 +5,8 @@ | |||||||
|  */ |  */ | ||||||
| #pragma once | #pragma once | ||||||
|  |  | ||||||
| #include <ixwebsocket/IXSocketTLSOptions.h> |  | ||||||
| #include <ixcobra/IXCobraConfig.h> | #include <ixcobra/IXCobraConfig.h> | ||||||
|  | #include <ixwebsocket/IXSocketTLSOptions.h> | ||||||
| #include <string> | #include <string> | ||||||
|  |  | ||||||
| namespace ix | namespace ix | ||||||
|   | |||||||
| @@ -32,8 +32,13 @@ namespace ix | |||||||
|         cobraMetricsPublisher.enable(true); |         cobraMetricsPublisher.enable(true); | ||||||
|  |  | ||||||
|         bool enablePerMessageDeflate = true; |         bool enablePerMessageDeflate = true; | ||||||
|         cobraMetricsPublisher.configure( |         cobraMetricsPublisher.configure(config.appkey, | ||||||
|             config.appkey, config.endpoint, channel, config.rolename, config.rolesecret, enablePerMessageDeflate, config.socketTLSOptions); |                                         config.endpoint, | ||||||
|  |                                         channel, | ||||||
|  |                                         config.rolename, | ||||||
|  |                                         config.rolesecret, | ||||||
|  |                                         enablePerMessageDeflate, | ||||||
|  |                                         config.socketTLSOptions); | ||||||
|  |  | ||||||
|         while (!cobraMetricsPublisher.isAuthenticated()) |         while (!cobraMetricsPublisher.isAuthenticated()) | ||||||
|             ; |             ; | ||||||
|   | |||||||
| @@ -4,94 +4,10 @@ | |||||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. |  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||||
|  */ |  */ | ||||||
|  |  | ||||||
| #include <atomic> | #include <ixbots/IXCobraToSentryBot.h> | ||||||
| #include <chrono> |  | ||||||
| #include <condition_variable> |  | ||||||
| #include <ixcobra/IXCobraConnection.h> |  | ||||||
| #include <ixsentry/IXSentryClient.h> |  | ||||||
| #include <map> |  | ||||||
| #include <mutex> |  | ||||||
| #include <queue> |  | ||||||
| #include <spdlog/spdlog.h> |  | ||||||
| #include <sstream> |  | ||||||
| #include <thread> |  | ||||||
| #include <vector> |  | ||||||
|  |  | ||||||
| namespace ix | namespace ix | ||||||
| { | { | ||||||
|     class QueueManager |  | ||||||
|     { |  | ||||||
|     public: |  | ||||||
|         QueueManager(size_t maxQueueSize, std::atomic<bool>& stop) |  | ||||||
|             : _maxQueueSize(maxQueueSize) |  | ||||||
|             , _stop(stop) |  | ||||||
|         { |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         Json::Value pop(); |  | ||||||
|         void add(Json::Value msg); |  | ||||||
|  |  | ||||||
|     private: |  | ||||||
|         std::map<std::string, std::queue<Json::Value>> _queues; |  | ||||||
|         std::mutex _mutex; |  | ||||||
|         std::condition_variable _condition; |  | ||||||
|         size_t _maxQueueSize; |  | ||||||
|         std::atomic<bool>& _stop; |  | ||||||
|     }; |  | ||||||
|  |  | ||||||
|     Json::Value QueueManager::pop() |  | ||||||
|     { |  | ||||||
|         std::unique_lock<std::mutex> lock(_mutex); |  | ||||||
|  |  | ||||||
|         if (_queues.empty()) |  | ||||||
|         { |  | ||||||
|             Json::Value val; |  | ||||||
|             return val; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         std::vector<std::string> games; |  | ||||||
|         for (auto it : _queues) |  | ||||||
|         { |  | ||||||
|             games.push_back(it.first); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         std::random_shuffle(games.begin(), games.end()); |  | ||||||
|         std::string game = games[0]; |  | ||||||
|  |  | ||||||
|         _condition.wait(lock, [this] { return !_stop; }); |  | ||||||
|  |  | ||||||
|         if (_queues[game].empty()) |  | ||||||
|         { |  | ||||||
|             Json::Value val; |  | ||||||
|             return val; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         auto msg = _queues[game].front(); |  | ||||||
|         _queues[game].pop(); |  | ||||||
|         return msg; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     void QueueManager::add(Json::Value msg) |  | ||||||
|     { |  | ||||||
|         std::unique_lock<std::mutex> lock(_mutex); |  | ||||||
|  |  | ||||||
|         std::string game; |  | ||||||
|         if (msg.isMember("device") && msg["device"].isMember("game")) |  | ||||||
|         { |  | ||||||
|             game = msg["device"]["game"].asString(); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if (game.empty()) return; |  | ||||||
|  |  | ||||||
|         // if the sending is not fast enough there is no point |  | ||||||
|         // in queuing too many events. |  | ||||||
|         if (_queues[game].size() < _maxQueueSize) |  | ||||||
|         { |  | ||||||
|             _queues[game].push(msg); |  | ||||||
|             _condition.notify_one(); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     int ws_cobra_to_sentry_main(const ix::CobraConfig& config, |     int ws_cobra_to_sentry_main(const ix::CobraConfig& config, | ||||||
|                                 const std::string& channel, |                                 const std::string& channel, | ||||||
|                                 const std::string& filter, |                                 const std::string& filter, | ||||||
| @@ -101,236 +17,7 @@ namespace ix | |||||||
|                                 int jobs, |                                 int jobs, | ||||||
|                                 size_t maxQueueSize) |                                 size_t maxQueueSize) | ||||||
|     { |     { | ||||||
|         ix::CobraConnection conn; |         return cobra_to_sentry_bot( | ||||||
|         conn.configure(config); |             config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); | ||||||
|         conn.connect(); |  | ||||||
|  |  | ||||||
|         Json::FastWriter jsonWriter; |  | ||||||
|         std::atomic<uint64_t> sentCount(0); |  | ||||||
|         std::atomic<uint64_t> receivedCount(0); |  | ||||||
|         std::atomic<bool> errorSending(false); |  | ||||||
|         std::atomic<bool> stop(false); |  | ||||||
|         std::atomic<bool> throttled(false); |  | ||||||
|  |  | ||||||
|         QueueManager queueManager(maxQueueSize, stop); |  | ||||||
|  |  | ||||||
|         auto timer = [&sentCount, &receivedCount] { |  | ||||||
|             while (true) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("messages received {} sent {}", receivedCount, sentCount); |  | ||||||
|  |  | ||||||
|                 auto duration = std::chrono::seconds(1); |  | ||||||
|                 std::this_thread::sleep_for(duration); |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         std::thread t1(timer); |  | ||||||
|  |  | ||||||
|         auto heartbeat = [&sentCount, &receivedCount] { |  | ||||||
|             std::string state("na"); |  | ||||||
|  |  | ||||||
|             while (true) |  | ||||||
|             { |  | ||||||
|                 std::stringstream ss; |  | ||||||
|                 ss << "messages received " << receivedCount; |  | ||||||
|                 ss << "messages sent " << sentCount; |  | ||||||
|  |  | ||||||
|                 std::string currentState = ss.str(); |  | ||||||
|  |  | ||||||
|                 if (currentState == state) |  | ||||||
|                 { |  | ||||||
|                     spdlog::error("no messages received or sent for 1 minute, exiting"); |  | ||||||
|                     exit(1); |  | ||||||
|                 } |  | ||||||
|                 state = currentState; |  | ||||||
|  |  | ||||||
|                 auto duration = std::chrono::minutes(1); |  | ||||||
|                 std::this_thread::sleep_for(duration); |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         std::thread t2(heartbeat); |  | ||||||
|  |  | ||||||
|         auto sentrySender = |  | ||||||
|             [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] { |  | ||||||
|                 SentryClient sentryClient(dsn); |  | ||||||
|  |  | ||||||
|                 while (true) |  | ||||||
|                 { |  | ||||||
|                     Json::Value msg = queueManager.pop(); |  | ||||||
|  |  | ||||||
|                     if (msg.isNull()) continue; |  | ||||||
|                     if (stop) return; |  | ||||||
|  |  | ||||||
|                     auto ret = sentryClient.send(msg, verbose); |  | ||||||
|                     HttpResponsePtr response = ret.first; |  | ||||||
|  |  | ||||||
|                     if (!response) |  | ||||||
|                     { |  | ||||||
|                         spdlog::warn("Null HTTP Response"); |  | ||||||
|                         continue; |  | ||||||
|                     } |  | ||||||
|  |  | ||||||
|                     if (verbose) |  | ||||||
|                     { |  | ||||||
|                         for (auto it : response->headers) |  | ||||||
|                         { |  | ||||||
|                             spdlog::info("{}: {}", it.first, it.second); |  | ||||||
|                         } |  | ||||||
|  |  | ||||||
|                         spdlog::info("Upload size: {}", response->uploadSize); |  | ||||||
|                         spdlog::info("Download size: {}", response->downloadSize); |  | ||||||
|  |  | ||||||
|                         spdlog::info("Status: {}", response->statusCode); |  | ||||||
|                         if (response->errorCode != HttpErrorCode::Ok) |  | ||||||
|                         { |  | ||||||
|                             spdlog::info("error message: {}", response->errorMsg); |  | ||||||
|                         } |  | ||||||
|  |  | ||||||
|                         if (response->headers["Content-Type"] != "application/octet-stream") |  | ||||||
|                         { |  | ||||||
|                             spdlog::info("payload: {}", response->payload); |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|  |  | ||||||
|                     if (response->statusCode != 200) |  | ||||||
|                     { |  | ||||||
|                         spdlog::error("Error sending data to sentry: {}", response->statusCode); |  | ||||||
|                         spdlog::error("Body: {}", ret.second); |  | ||||||
|                         spdlog::error("Response: {}", response->payload); |  | ||||||
|                         errorSending = true; |  | ||||||
|  |  | ||||||
|                         // Error 429 Too Many Requests |  | ||||||
|                         if (response->statusCode == 429) |  | ||||||
|                         { |  | ||||||
|                             auto retryAfter = response->headers["Retry-After"]; |  | ||||||
|                             std::stringstream ss; |  | ||||||
|                             ss << retryAfter; |  | ||||||
|                             int seconds; |  | ||||||
|                             ss >> seconds; |  | ||||||
|  |  | ||||||
|                             if (!ss.eof() || ss.fail()) |  | ||||||
|                             { |  | ||||||
|                                 seconds = 30; |  | ||||||
|                                 spdlog::warn("Error parsing Retry-After header. " |  | ||||||
|                                              "Using {} for the sleep duration", |  | ||||||
|                                              seconds); |  | ||||||
|                             } |  | ||||||
|  |  | ||||||
|                             spdlog::warn("Error 429 - Too Many Requests. ws will sleep " |  | ||||||
|                                          "and retry after {} seconds", |  | ||||||
|                                          retryAfter); |  | ||||||
|  |  | ||||||
|                             throttled = true; |  | ||||||
|                             auto duration = std::chrono::seconds(seconds); |  | ||||||
|                             std::this_thread::sleep_for(duration); |  | ||||||
|                             throttled = false; |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                     else |  | ||||||
|                     { |  | ||||||
|                         ++sentCount; |  | ||||||
|                     } |  | ||||||
|  |  | ||||||
|                     if (stop) return; |  | ||||||
|                 } |  | ||||||
|             }; |  | ||||||
|  |  | ||||||
|         // Create a thread pool |  | ||||||
|         spdlog::info("Starting {} sentry sender jobs", jobs); |  | ||||||
|         std::vector<std::thread> pool; |  | ||||||
|         for (int i = 0; i < jobs; i++) |  | ||||||
|         { |  | ||||||
|             pool.push_back(std::thread(sentrySender)); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         conn.setEventCallback([&conn, |  | ||||||
|                                &channel, |  | ||||||
|                                &filter, |  | ||||||
|                                &jsonWriter, |  | ||||||
|                                verbose, |  | ||||||
|                                &throttled, |  | ||||||
|                                &receivedCount, |  | ||||||
|                                &queueManager](ix::CobraConnectionEventType eventType, |  | ||||||
|                                               const std::string& errMsg, |  | ||||||
|                                               const ix::WebSocketHttpHeaders& headers, |  | ||||||
|                                               const std::string& subscriptionId, |  | ||||||
|                                               CobraConnection::MsgId msgId) { |  | ||||||
|             if (eventType == ix::CobraConnection_EventType_Open) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Subscriber connected"); |  | ||||||
|  |  | ||||||
|                 for (auto it : headers) |  | ||||||
|                 { |  | ||||||
|                     spdlog::info("{}: {}", it.first, it.second); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             if (eventType == ix::CobraConnection_EventType_Closed) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Subscriber closed"); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_Authenticated) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Subscriber authenticated"); |  | ||||||
|                 conn.subscribe(channel, |  | ||||||
|                                filter, |  | ||||||
|                                [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( |  | ||||||
|                                    const Json::Value& msg) { |  | ||||||
|                                    if (verbose) |  | ||||||
|                                    { |  | ||||||
|                                        spdlog::info(jsonWriter.write(msg)); |  | ||||||
|                                    } |  | ||||||
|  |  | ||||||
|                                    // If we cannot send to sentry fast enough, drop the message |  | ||||||
|                                    if (throttled) |  | ||||||
|                                    { |  | ||||||
|                                        return; |  | ||||||
|                                    } |  | ||||||
|  |  | ||||||
|                                    ++receivedCount; |  | ||||||
|                                    queueManager.add(msg); |  | ||||||
|                                }); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_Subscribed) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_UnSubscribed) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_Error) |  | ||||||
|             { |  | ||||||
|                 spdlog::error("Subscriber: error {}", errMsg); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_Published) |  | ||||||
|             { |  | ||||||
|                 spdlog::error("Published message hacked: {}", msgId); |  | ||||||
|             } |  | ||||||
|             else if (eventType == ix::CobraConnection_EventType_Pong) |  | ||||||
|             { |  | ||||||
|                 spdlog::info("Received websocket pong"); |  | ||||||
|             } |  | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         while (true) |  | ||||||
|         { |  | ||||||
|             auto duration = std::chrono::seconds(1); |  | ||||||
|             std::this_thread::sleep_for(duration); |  | ||||||
|  |  | ||||||
|             if (strict && errorSending) break; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         conn.disconnect(); |  | ||||||
|  |  | ||||||
|         // join all the bg threads and stop them. |  | ||||||
|         stop = true; |  | ||||||
|         for (int i = 0; i < jobs; i++) |  | ||||||
|         { |  | ||||||
|             spdlog::error("joining thread {}", i); |  | ||||||
|             pool[i].join(); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         return (strict && errorSending) ? 1 : 0; |  | ||||||
|     } |     } | ||||||
| } // namespace ix | } // namespace ix | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user