From 83ae105edb3d94682b01b8c4b04bab68662513a1 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 12 Mar 2020 12:13:31 -0700 Subject: [PATCH] minor refactoring to delete files which are not needed --- ixbots/ixbots/IXCobraToSentryBot.cpp | 2 +- ixbots/ixbots/IXCobraToSentryBot.h | 2 +- ixbots/ixbots/IXCobraToStatsdBot.h | 10 +- test/IXCobraToSentryBotTest.cpp | 4 +- ws/CMakeLists.txt | 2 - ws/ws.cpp | 10 +- ws/ws.h | 18 -- ws/ws_cobra_to_sentry.cpp | 25 --- ws/ws_cobra_to_statsd.cpp | 273 --------------------------- 9 files changed, 17 insertions(+), 329 deletions(-) delete mode 100644 ws/ws_cobra_to_sentry.cpp delete mode 100644 ws/ws_cobra_to_statsd.cpp diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index dd8e883c..7c4b2fb3 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -17,7 +17,7 @@ namespace ix { - int cobra_to_sentry_bot(const ix::CobraConfig& config, + int cobra_to_sentry_bot(const CobraConfig& config, const std::string& channel, const std::string& filter, const std::string& dsn, diff --git a/ixbots/ixbots/IXCobraToSentryBot.h b/ixbots/ixbots/IXCobraToSentryBot.h index e1ce74d1..c9ebc7a3 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.h +++ b/ixbots/ixbots/IXCobraToSentryBot.h @@ -10,7 +10,7 @@ namespace ix { - int cobra_to_sentry_bot(const ix::CobraConfig& config, + int cobra_to_sentry_bot(const CobraConfig& config, const std::string& channel, const std::string& filter, const std::string& dsn, diff --git a/ixbots/ixbots/IXCobraToStatsdBot.h b/ixbots/ixbots/IXCobraToStatsdBot.h index a6a1bcc2..5f424388 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.h +++ b/ixbots/ixbots/IXCobraToStatsdBot.h @@ -14,9 +14,9 @@ namespace ix int cobra_to_statsd_bot(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, - const std::string& dsn, - bool verbose, - bool strict, - int jobs, - size_t maxQueueSize); + const std::string& host, + int port, + const std::string& prefix, + const std::string& fields, + bool verbose); } // namespace ix diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp index 44813430..68a691b2 100644 --- a/test/IXCobraToSentryBotTest.cpp +++ b/test/IXCobraToSentryBotTest.cpp @@ -153,7 +153,9 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]") // https://xxxxx:yyyyyy@sentry.io/1234567 std::stringstream oss; - oss << "http://xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567"; + std::string scheme("http://"); + + oss << scheme << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567"; std::string dsn = oss.str(); // Only run the bot for 3 seconds diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 058af9b2..275109e4 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -58,8 +58,6 @@ add_executable(ws ws_cobra_subscribe.cpp ws_cobra_metrics_publish.cpp ws_cobra_publish.cpp - ws_cobra_to_statsd.cpp - ws_cobra_to_sentry.cpp ws_cobra_metrics_to_redis.cpp ws_httpd.cpp ws_autobahn.cpp diff --git a/ws/ws.cpp b/ws/ws.cpp index 9c711fc7..532f7e4b 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -440,13 +442,15 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_to_statsd")) { - ret = ix::ws_cobra_to_statsd_main( + ret = ix::cobra_to_statsd_bot( cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose); } else if (app.got_subcommand("cobra_to_sentry")) { - ret = ix::ws_cobra_to_sentry_main( - cobraConfig, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); + bool enableHeartbeat = true; + int runtime = -1; + ret = ix::cobra_to_sentry_bot( + cobraConfig, channel, filter, dsn, verbose, strict, jobs, maxQueueSize, enableHeartbeat, runtime); } else if (app.got_subcommand("cobra_metrics_to_redis")) { diff --git a/ws/ws.h b/ws/ws.h index 027e64b4..cfa0d2f7 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -90,24 +90,6 @@ namespace ix const std::string& path, bool stress); - int ws_cobra_to_statsd_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& host, - int port, - const std::string& prefix, - const std::string& fields, - bool verbose); - - int ws_cobra_to_sentry_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& dsn, - bool verbose, - bool strict, - int jobs, - size_t maxQueueSize); - int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp deleted file mode 100644 index fbc121f8..00000000 --- a/ws/ws_cobra_to_sentry.cpp +++ /dev/null @@ -1,25 +0,0 @@ -/* - * ws_cobra_to_sentry.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include - -namespace ix -{ - int ws_cobra_to_sentry_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& dsn, - bool verbose, - bool strict, - int jobs, - size_t maxQueueSize) - { - bool enableHeartbeat = true; - int runtime = -1; - return cobra_to_sentry_bot( - config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize, enableHeartbeat, runtime); - } -} // namespace ix diff --git a/ws/ws_cobra_to_statsd.cpp b/ws/ws_cobra_to_statsd.cpp deleted file mode 100644 index e86cadbf..00000000 --- a/ws/ws_cobra_to_statsd.cpp +++ /dev/null @@ -1,273 +0,0 @@ -/* - * ws_cobra_to_statsd.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#ifndef _WIN32 -#include -#endif - -namespace -{ - class QueueManager - { - public: - QueueManager(size_t maxQueueSize, std::atomic& stop) - : _maxQueueSize(maxQueueSize) - , _stop(stop) - { - } - - Json::Value pop(); - void add(Json::Value msg); - - private: - std::queue _queue; - std::mutex _mutex; - std::condition_variable _condition; - size_t _maxQueueSize; - std::atomic& _stop; - }; - - Json::Value QueueManager::pop() - { - std::unique_lock lock(_mutex); - - if (_queue.empty()) - { - Json::Value val; - return val; - } - - _condition.wait(lock, [this] { return !_stop; }); - - auto msg = _queue.front(); - _queue.pop(); - return msg; - } - - void QueueManager::add(Json::Value msg) - { - std::unique_lock lock(_mutex); - - // if the sending is not fast enough there is no point - // in queuing too many events. - if (_queue.size() < _maxQueueSize) - { - _queue.push(msg); - _condition.notify_one(); - } - } -} // namespace - -namespace ix -{ - // fields are command line argument that can be specified multiple times - std::vector parseFields(const std::string& fields) - { - std::vector tokens; - - // Split by \n - std::string token; - std::stringstream tokenStream(fields); - - while (std::getline(tokenStream, token)) - { - tokens.push_back(token); - } - - return tokens; - } - - // - // Extract an attribute from a Json Value. - // extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz - // - std::string extractAttr(const std::string& attr, const Json::Value& jsonValue) - { - // Split by . - std::string token; - std::stringstream tokenStream(attr); - - Json::Value val(jsonValue); - - while (std::getline(tokenStream, token, '.')) - { - val = val[token]; - } - - return val.asString(); - } - - int ws_cobra_to_statsd_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& host, - int port, - const std::string& prefix, - const std::string& fields, - bool verbose) - { - ix::CobraConnection conn; - conn.configure(config); - conn.connect(); - - auto tokens = parseFields(fields); - - Json::FastWriter jsonWriter; - std::atomic sentCount(0); - std::atomic receivedCount(0); - std::atomic stop(false); - - size_t maxQueueSize = 1000; - QueueManager queueManager(maxQueueSize, stop); - - auto timer = [&sentCount, &receivedCount] { - while (true) - { - spdlog::info("messages received {} sent {}", receivedCount, sentCount); - - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t1(timer); - - auto heartbeat = [&sentCount, &receivedCount] { - std::string state("na"); - - while (true) - { - std::stringstream ss; - ss << "messages received " << receivedCount; - ss << "messages sent " << sentCount; - - std::string currentState = ss.str(); - - if (currentState == state) - { - spdlog::error("no messages received or sent for 1 minute, exiting"); - exit(1); - } - state = currentState; - - auto duration = std::chrono::minutes(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t2(heartbeat); - - auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] { - // statsd client - // test with netcat as a server: `nc -ul 8125` - bool statsdBatch = true; -#ifndef _WIN32 - statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); -#else - int statsdClient; -#endif - while (true) - { - Json::Value msg = queueManager.pop(); - - if (msg.isNull()) continue; - if (stop) return; - - std::string id; - for (auto&& attr : tokens) - { - id += "."; - id += extractAttr(attr, msg); - } - - sentCount += 1; - -#ifndef _WIN32 - statsdClient.count(id, 1); -#endif - } - }; - - std::thread t3(statsdSender); - - conn.setEventCallback( - [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( - ix::CobraConnectionEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraConnection_EventType_Open) - { - spdlog::info("Subscriber connected"); - - for (auto it : headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - if (eventType == ix::CobraConnection_EventType_Closed) - { - spdlog::info("Subscriber closed"); - } - else if (eventType == ix::CobraConnection_EventType_Authenticated) - { - spdlog::info("Subscriber authenticated"); - conn.subscribe(channel, - filter, - [&jsonWriter, &queueManager, verbose, &receivedCount]( - const Json::Value& msg) { - if (verbose) - { - spdlog::info(jsonWriter.write(msg)); - } - - receivedCount++; - - ++receivedCount; - queueManager.add(msg); - }); - } - else if (eventType == ix::CobraConnection_EventType_Subscribed) - { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); - } - else if (eventType == ix::CobraConnection_EventType_UnSubscribed) - { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); - } - else if (eventType == ix::CobraConnection_EventType_Error) - { - spdlog::error("Subscriber: error {}", errMsg); - } - else if (eventType == ix::CobraConnection_EventType_Published) - { - spdlog::error("Published message hacked: {}", msgId); - } - else if (eventType == ix::CobraConnection_EventType_Pong) - { - spdlog::info("Received websocket pong"); - } - }); - - while (true) - { - std::chrono::duration duration(1000); - std::this_thread::sleep_for(duration); - } - - return 0; - } -} // namespace ix