diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d42ae0aa..90280906 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,7 +3,7 @@ All changes to this project will be documented in this file. ## [9.3.1] - 2020-04-16 -(cobra bots) add a utility class to factor out the common bots features (heartbeat) and move cobra to sentry bot to use it +(cobra bots) add a utility class to factor out the common bots features (heartbeat) and move all bots to used it + convert cobra_subscribe to be a bot and add a unittest for it ## [9.3.0] - 2020-04-15 diff --git a/ixbots/CMakeLists.txt b/ixbots/CMakeLists.txt index 283feecf..2bf3a2f9 100644 --- a/ixbots/CMakeLists.txt +++ b/ixbots/CMakeLists.txt @@ -7,6 +7,7 @@ set (IXBOTS_SOURCES ixbots/IXCobraBot.cpp ixbots/IXCobraToSentryBot.cpp ixbots/IXCobraToStatsdBot.cpp + ixbots/IXCobraToStdoutBot.cpp ixbots/IXQueueManager.cpp ixbots/IXStatsdClient.cpp ) @@ -15,6 +16,7 @@ set (IXBOTS_HEADERS ixbots/IXCobraBot.h ixbots/IXCobraToSentryBot.h ixbots/IXCobraToStatsdBot.h + ixbots/IXCobraToStdoutBot.h ixbots/IXQueueManager.h ixbots/IXStatsdClient.h ) diff --git a/ixbots/ixbots/IXCobraBot.cpp b/ixbots/ixbots/IXCobraBot.cpp index 5b65f5e1..094e487e 100644 --- a/ixbots/ixbots/IXCobraBot.cpp +++ b/ixbots/ixbots/IXCobraBot.cpp @@ -23,6 +23,7 @@ namespace ix const std::string& position, bool verbose, size_t maxQueueSize, + bool useQueue, bool enableHeartbeat, int runtime) { @@ -83,16 +84,18 @@ namespace ix std::thread t2(heartbeat); auto sender = - [this, &queueManager, verbose, &sentCount, &stop, &throttled] { + [this, &queueManager, verbose, &sentCount, &stop, &throttled, &fatalCobraError] { while (true) { - Json::Value msg = queueManager.pop(); + auto data = queueManager.pop(); + Json::Value msg = data.first; + std::string position = data.second; if (stop) break; if (msg.isNull()) continue; - if (_onBotMessageCallback && _onBotMessageCallback(msg, verbose, throttled)) + if (_onBotMessageCallback && _onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError)) { // That might be too noisy if (verbose) @@ -114,16 +117,21 @@ namespace ix std::thread t3(sender); - conn.setEventCallback([&conn, + std::string subscriptionPosition(position); + + conn.setEventCallback([this, + &conn, &channel, &filter, - &position, + &subscriptionPosition, &jsonWriter, verbose, &throttled, &receivedCount, &fatalCobraError, - &queueManager](const CobraEventPtr& event) + &useQueue, + &queueManager, + &sentCount](const CobraEventPtr& event) { if (event->type == ix::CobraEventType::Open) { @@ -141,16 +149,20 @@ namespace ix else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Subscriber authenticated"); + spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition); + spdlog::info("Using filter: {}", filter); conn.subscribe(channel, filter, - position, - [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( + subscriptionPosition, + [this, &jsonWriter, verbose, &throttled, &receivedCount, &queueManager, &useQueue, &subscriptionPosition, &fatalCobraError, &sentCount]( const Json::Value& msg, const std::string& position) { if (verbose) { spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg)); } + subscriptionPosition = position; + // If we cannot send to sentry fast enough, drop the message if (throttled) { @@ -158,7 +170,27 @@ namespace ix } ++receivedCount; - queueManager.add(msg); + + if (useQueue) + { + queueManager.add(msg, position); + } + else + { + if (_onBotMessageCallback && _onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError)) + { + // That might be too noisy + if (verbose) + { + spdlog::info("cobra bot: sending succesfull"); + } + ++sentCount; + } + else + { + spdlog::error("cobra bot: error sending"); + } + } }); } else if (event->type == ix::CobraEventType::Subscribed) diff --git a/ixbots/ixbots/IXCobraBot.h b/ixbots/ixbots/IXCobraBot.h index 187ccb29..a6bb3eef 100644 --- a/ixbots/ixbots/IXCobraBot.h +++ b/ixbots/ixbots/IXCobraBot.h @@ -9,15 +9,14 @@ #include #include #include -#include -#include -#include -#include +#include namespace ix { using OnBotMessageCallback = std::function&, std::atomic&)>; class CobraBot @@ -31,6 +30,7 @@ namespace ix const std::string& position, bool verbose, size_t maxQueueSize, + bool useQueue, bool enableHeartbeat, int runtime); diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index b3a37aa5..66a707da 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -12,26 +12,26 @@ #include #include #include -#include #include namespace ix { - int cobra_to_sentry_bot(const CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - SentryClient& sentryClient, - bool verbose, - bool strict, - size_t maxQueueSize, - bool enableHeartbeat, - int runtime) + int64_t cobra_to_sentry_bot(const CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + SentryClient& sentryClient, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime) { CobraBot bot; bot.setOnBotMessageCallback([&sentryClient](const Json::Value& msg, + const std::string& /*position*/, const bool verbose, - std::atomic& throttled) -> bool { + std::atomic& throttled, + std::atomic& /*fatalCobraError*/) -> bool { auto ret = sentryClient.send(msg, verbose); HttpResponsePtr response = ret.first; @@ -102,12 +102,15 @@ namespace ix return success; }); + bool useQueue = true; + return bot.run(config, channel, filter, position, verbose, maxQueueSize, + useQueue, enableHeartbeat, runtime); } diff --git a/ixbots/ixbots/IXCobraToSentryBot.h b/ixbots/ixbots/IXCobraToSentryBot.h index b0b4606c..d582bd3e 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.h +++ b/ixbots/ixbots/IXCobraToSentryBot.h @@ -8,17 +8,17 @@ #include #include #include +#include namespace ix { - int cobra_to_sentry_bot(const CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - SentryClient& sentryClient, - bool verbose, - bool strict, - size_t maxQueueSize, - bool enableHeartbeat, - int runtime); + int64_t cobra_to_sentry_bot(const CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + SentryClient& sentryClient, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime); } // namespace ix diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index b838bc01..e320d077 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -7,14 +7,12 @@ #include "IXCobraToStatsdBot.h" #include "IXQueueManager.h" #include "IXStatsdClient.h" +#include "IXCobraBot.h" -#include #include -#include #include #include #include -#include #include namespace ix @@ -56,18 +54,18 @@ namespace ix return val; } - int cobra_to_statsd_bot(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - StatsdClient& statsdClient, - const std::string& fields, - const std::string& gauge, - const std::string& timer, - bool verbose, - size_t maxQueueSize, - bool enableHeartbeat, - int runtime) + int64_t cobra_to_statsd_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + StatsdClient& statsdClient, + const std::string& fields, + const std::string& gauge, + const std::string& timer, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime) { ix::CobraConnection conn; conn.configure(config); @@ -75,242 +73,85 @@ namespace ix auto tokens = parseFields(fields); - Json::FastWriter jsonWriter; - std::atomic sentCount(0); - std::atomic receivedCount(0); - std::atomic stop(false); - std::atomic fatalCobraError(false); - - QueueManager queueManager(maxQueueSize); - - auto progress = [&sentCount, &receivedCount, &stop] { - while (!stop) + CobraBot bot; + bot.setOnBotMessageCallback([&statsdClient, &tokens, &gauge, &timer](const Json::Value& msg, + const std::string& /*position*/, + const bool verbose, + std::atomic& /*throttled*/, + std::atomic& fatalCobraError) -> bool { + std::string id; + for (auto&& attr : tokens) { - spdlog::info("messages received {} sent {}", receivedCount, sentCount); - - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); + id += "."; + auto val = extractAttr(attr, msg); + id += val.asString(); } - spdlog::info("timer thread done"); - }; - - std::thread t1(progress); - - auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { - std::string state("na"); - - if (!enableHeartbeat) return; - - while (!stop) + if (gauge.empty() && timer.empty()) { - std::stringstream ss; - ss << "messages received " << receivedCount; - ss << "messages sent " << sentCount; - - std::string currentState = ss.str(); - - if (currentState == state) - { - spdlog::error("no messages received or sent for 1 minute, exiting"); - exit(1); - } - state = currentState; - - auto duration = std::chrono::minutes(1); - std::this_thread::sleep_for(duration); + statsdClient.count(id, 1); } - - spdlog::info("heartbeat thread done"); - }; - - std::thread t2(heartbeat); - - auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop, &gauge, &timer, &fatalCobraError, &verbose] { - while (true) + else { - Json::Value msg = queueManager.pop(); + std::string attrName = (!gauge.empty()) ? gauge : timer; + auto val = extractAttr(attrName, msg); + size_t x; - if (stop) return; - if (msg.isNull()) continue; - - std::string id; - for (auto&& attr : tokens) + if (val.isInt()) { - id += "."; - auto val = extractAttr(attr, msg); - id += val.asString(); + x = (size_t) val.asInt(); } - - if (gauge.empty() && timer.empty()) + else if (val.isInt64()) { - statsdClient.count(id, 1); + x = (size_t) val.asInt64(); + } + else if (val.isUInt()) + { + x = (size_t) val.asUInt(); + } + else if (val.isUInt64()) + { + x = (size_t) val.asUInt64(); + } + else if (val.isDouble()) + { + x = (size_t) val.asUInt64(); } else { - std::string attrName = (!gauge.empty()) ? gauge : timer; - auto val = extractAttr(attrName, msg); - size_t x; - - if (val.isInt()) - { - x = (size_t) val.asInt(); - } - else if (val.isInt64()) - { - x = (size_t) val.asInt64(); - } - else if (val.isUInt()) - { - x = (size_t) val.asUInt(); - } - else if (val.isUInt64()) - { - x = (size_t) val.asUInt64(); - } - else if (val.isDouble()) - { - x = (size_t) val.asUInt64(); - } - else - { - spdlog::error("Gauge {} is not a numberic type", gauge); - fatalCobraError = true; - break; - } - - if (verbose) - { - spdlog::info("{} - {} -> {}", id, attrName, x); - } - - if (!gauge.empty()) - { - statsdClient.gauge(id, x); - } - else - { - statsdClient.timing(id, x); - } - } - - sentCount += 1; - } - }; - - std::thread t3(statsdSender); - - conn.setEventCallback( - [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](const CobraEventPtr& event) - { - if (event->type == ix::CobraEventType::Open) - { - spdlog::info("Subscriber connected"); - - for (auto&& it : event->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (event->type == ix::CobraEventType::Closed) - { - spdlog::info("Subscriber closed: {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Authenticated) - { - spdlog::info("Subscriber authenticated"); - conn.subscribe(channel, - filter, - position, - [&jsonWriter, &queueManager, verbose, &receivedCount]( - const Json::Value& msg, const std::string& position) { - if (verbose) - { - spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg)); - } - - receivedCount++; - - ++receivedCount; - queueManager.add(msg); - }); - } - else if (event->type == ix::CobraEventType::Subscribed) - { - spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::UnSubscribed) - { - spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::Error) - { - spdlog::error("Subscriber: error {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Published) - { - spdlog::error("Published message hacked: {}", event->msgId); - } - else if (event->type == ix::CobraEventType::Pong) - { - spdlog::info("Received websocket pong"); - } - else if (event->type == ix::CobraEventType::HandshakeError) - { - spdlog::error("Subscriber: Handshake error: {}", event->errMsg); + spdlog::error("Gauge {} is not a numeric type", gauge); fatalCobraError = true; + return false; } - else if (event->type == ix::CobraEventType::AuthenticationError) + + if (verbose) { - spdlog::error("Subscriber: Authentication error: {}", event->errMsg); - fatalCobraError = true; + spdlog::info("{} - {} -> {}", id, attrName, x); } - else if (event->type == ix::CobraEventType::SubscriptionError) + + if (!gauge.empty()) { - spdlog::error("Subscriber: Subscription error: {}", event->errMsg); - fatalCobraError = true; + statsdClient.gauge(id, x); + } + else + { + statsdClient.timing(id, x); } - }); - - // Run forever - if (runtime == -1) - { - while (true) - { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - - if (fatalCobraError) break; } - } - // Run for a duration, used by unittesting now - else - { - for (int i = 0 ; i < runtime; ++i) - { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - if (fatalCobraError) break; - } - } + return true; + }); - // - // Cleanup. - // join all the bg threads and stop them. - // - conn.disconnect(); - stop = true; + bool useQueue = true; - // progress thread - t1.join(); - - // heartbeat thread - if (t2.joinable()) t2.join(); - - // statsd sender thread - t3.join(); - - return fatalCobraError ? -1 : (int) sentCount; + return bot.run(config, + channel, + filter, + position, + verbose, + maxQueueSize, + useQueue, + enableHeartbeat, + runtime); } } // namespace ix diff --git a/ixbots/ixbots/IXCobraToStatsdBot.h b/ixbots/ixbots/IXCobraToStatsdBot.h index 6991aeb3..44a68873 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.h +++ b/ixbots/ixbots/IXCobraToStatsdBot.h @@ -9,19 +9,20 @@ #include #include #include +#include namespace ix { - int cobra_to_statsd_bot(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - StatsdClient& statsdClient, - const std::string& fields, - const std::string& gauge, - const std::string& timer, - bool verbose, - size_t maxQueueSize, - bool enableHeartbeat, - int runtime); + int64_t cobra_to_statsd_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + StatsdClient& statsdClient, + const std::string& fields, + const std::string& gauge, + const std::string& timer, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime); } // namespace ix diff --git a/ixbots/ixbots/IXCobraToStdoutBot.cpp b/ixbots/ixbots/IXCobraToStdoutBot.cpp new file mode 100644 index 00000000..67941ff3 --- /dev/null +++ b/ixbots/ixbots/IXCobraToStdoutBot.cpp @@ -0,0 +1,106 @@ +/* + * IXCobraToStdoutBot.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXCobraToStdoutBot.h" +#include "IXCobraBot.h" +#include "IXQueueManager.h" + +#include +#include +#include +#include + +namespace ix +{ + using StreamWriterPtr = std::unique_ptr; + + StreamWriterPtr makeStreamWriter() + { + Json::StreamWriterBuilder builder; + builder["commentStyle"] = "None"; + builder["indentation"] = ""; // will make the JSON object compact + std::unique_ptr jsonWriter(builder.newStreamWriter()); + return jsonWriter; + } + + std::string timeSinceEpoch() + { + std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); + std::chrono::system_clock::duration dtn = tp.time_since_epoch(); + + std::stringstream ss; + ss << dtn.count() * std::chrono::system_clock::period::num / + std::chrono::system_clock::period::den; + return ss.str(); + } + + void writeToStdout(bool fluentd, + const StreamWriterPtr& jsonWriter, + const Json::Value& msg, + const std::string& position) + { + Json::Value enveloppe; + if (fluentd) + { + enveloppe["producer"] = "cobra"; + enveloppe["consumer"] = "fluentd"; + + Json::Value nestedMessage(msg); + nestedMessage["position"] = position; + nestedMessage["created_at"] = timeSinceEpoch(); + enveloppe["message"] = nestedMessage; + + jsonWriter->write(enveloppe, &std::cout); + std::cout << std::endl; // add lf and flush + } + else + { + enveloppe = msg; + std::cout << position << " "; + jsonWriter->write(enveloppe, &std::cout); + std::cout << std::endl; + } + } + + int64_t cobra_to_stdout_bot(const CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + bool fluentd, + bool quiet, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime) + { + CobraBot bot; + auto jsonWriter = makeStreamWriter(); + + bot.setOnBotMessageCallback([&fluentd, &quiet, &jsonWriter](const Json::Value& msg, + const std::string& position, + const bool /*verbose*/, + std::atomic& /*throttled*/, + std::atomic& /*fatalCobraError*/) -> bool { + if (!quiet) + { + writeToStdout(fluentd, jsonWriter, msg, position); + } + return true; + }); + + bool useQueue = false; + + return bot.run(config, + channel, + filter, + position, + verbose, + maxQueueSize, + useQueue, + enableHeartbeat, + runtime); + } +} // namespace ix diff --git a/ixbots/ixbots/IXCobraToStdoutBot.h b/ixbots/ixbots/IXCobraToStdoutBot.h new file mode 100644 index 00000000..2a4df1fd --- /dev/null +++ b/ixbots/ixbots/IXCobraToStdoutBot.h @@ -0,0 +1,25 @@ +/* + * IXCobraToStdoutBot.h + * Author: Benjamin Sergeant + * Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved. + */ +#pragma once + +#include +#include +#include +#include + +namespace ix +{ + int64_t cobra_to_stdout_bot(const ix::CobraConfig& config, + const std::string& channel, + const std::string& filter, + const std::string& position, + bool fluentd, + bool quiet, + bool verbose, + size_t maxQueueSize, + bool enableHeartbeat, + int runtime); +} // namespace ix diff --git a/ixbots/ixbots/IXQueueManager.cpp b/ixbots/ixbots/IXQueueManager.cpp index c6703c05..c273607e 100644 --- a/ixbots/ixbots/IXQueueManager.cpp +++ b/ixbots/ixbots/IXQueueManager.cpp @@ -10,14 +10,14 @@ namespace ix { - Json::Value QueueManager::pop() + std::pair QueueManager::pop() { std::unique_lock lock(_mutex); if (_queues.empty()) { Json::Value val; - return val; + return std::make_pair(val, std::string()); } std::vector games; @@ -35,7 +35,7 @@ namespace ix if (_queues[game].empty()) { Json::Value val; - return val; + return std::make_pair(val, std::string()); } auto msg = _queues[game].front(); @@ -43,7 +43,7 @@ namespace ix return msg; } - void QueueManager::add(Json::Value msg) + void QueueManager::add(const Json::Value& msg, const std::string& position) { std::unique_lock lock(_mutex); @@ -59,7 +59,7 @@ namespace ix // in queuing too many events. if (_queues[game].size() < _maxQueueSize) { - _queues[game].push(msg); + _queues[game].push(std::make_pair(msg, position)); _condition.notify_one(); } } diff --git a/ixbots/ixbots/IXQueueManager.h b/ixbots/ixbots/IXQueueManager.h index 6685edad..5a8e4aa8 100644 --- a/ixbots/ixbots/IXQueueManager.h +++ b/ixbots/ixbots/IXQueueManager.h @@ -23,11 +23,11 @@ namespace ix { } - Json::Value pop(); - void add(Json::Value msg); + std::pair pop(); + void add(const Json::Value& msg, const std::string& position); private: - std::map> _queues; + std::map>> _queues; std::mutex _mutex; std::condition_variable _condition; size_t _maxQueueSize; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index aed1a9f8..70599804 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -66,6 +66,7 @@ if (UNIX) IXCobraMetricsPublisherTest.cpp IXCobraToSentryBotTest.cpp IXCobraToStatsdBotTest.cpp + IXCobraToStdoutBotTest.cpp ) endif() diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp index f928299c..368af796 100644 --- a/test/IXCobraToSentryBotTest.cpp +++ b/test/IXCobraToSentryBotTest.cpp @@ -141,7 +141,6 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]") std::string filter; std::string position("$"); bool verbose = true; - bool strict = true; size_t maxQueueSize = 10; bool enableHeartbeat = false; @@ -161,16 +160,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]") // Only run the bot for 3 seconds int runtime = 3; - int sentCount = cobra_to_sentry_bot(config, - channel, - filter, - position, - sentryClient, - verbose, - strict, - maxQueueSize, - enableHeartbeat, - runtime); + int64_t sentCount = cobra_to_sentry_bot(config, + channel, + filter, + position, + sentryClient, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); // // We want at least 2 messages to be sent // diff --git a/test/IXCobraToStatsdBotTest.cpp b/test/IXCobraToStatsdBotTest.cpp index 8067573c..48763178 100644 --- a/test/IXCobraToStatsdBotTest.cpp +++ b/test/IXCobraToStatsdBotTest.cpp @@ -114,18 +114,18 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]") std::string gauge; std::string timer; - int sentCount = ix::cobra_to_statsd_bot(config, - channel, - filter, - position, - statsdClient, - fields, - gauge, - timer, - verbose, - maxQueueSize, - enableHeartbeat, - runtime); + int64_t sentCount = ix::cobra_to_statsd_bot(config, + channel, + filter, + position, + statsdClient, + fields, + gauge, + timer, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); // // We want at least 2 messages to be sent // diff --git a/test/IXCobraToStdoutBotTest.cpp b/test/IXCobraToStdoutBotTest.cpp new file mode 100644 index 00000000..56f4d1f2 --- /dev/null +++ b/test/IXCobraToStdoutBotTest.cpp @@ -0,0 +1,127 @@ +/* + * IXCobraToStdoutTest.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone. All rights reserved. + */ + +#include "IXTest.h" +#include "catch.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace ix; + +namespace +{ + void runPublisher(const ix::CobraConfig& config, const std::string& channel) + { + ix::CobraMetricsPublisher cobraMetricsPublisher; + cobraMetricsPublisher.configure(config, channel); + cobraMetricsPublisher.setSession(uuid4()); + cobraMetricsPublisher.enable(true); + + Json::Value msg; + msg["fps"] = 60; + + cobraMetricsPublisher.setGenericAttributes("game", "ody"); + + // Wait a bit + ix::msleep(500); + + // publish some messages + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1) + cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2) + ix::msleep(500); + + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3) + cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4) + ix::msleep(500); + + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4) + cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5) + ix::msleep(500); + } +} // namespace + +TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]") +{ + SECTION("Exchange and count sent/received messages.") + { + int port = getFreePort(); + snake::AppConfig appConfig = makeSnakeServerConfig(port, true); + + // Start a redis server + ix::RedisServer redisServer(appConfig.redisPort); + auto res = redisServer.listen(); + REQUIRE(res.first); + redisServer.start(); + + // Start a snake server + snake::SnakeServer snakeServer(appConfig); + snakeServer.run(); + + // Run the bot for a small amount of time + std::string channel = ix::generateSessionId(); + std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); + std::string role = "_sub"; + std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; + std::string endpoint = makeCobraEndpoint(port, true); + + ix::CobraConfig config; + config.endpoint = endpoint; + config.appkey = appkey; + config.rolename = role; + config.rolesecret = secret; + config.socketTLSOptions = makeClientTLSOptions(); + + std::thread publisherThread(runPublisher, config, channel); + + std::string filter; + std::string position("$"); + bool verbose = true; + bool quiet = false; + size_t maxQueueSize = 10; + bool enableHeartbeat = false; + + // Only run the bot for 3 seconds + int runtime = 3; + + // We could try to capture the output ... not sure how. + bool fluentd = true; + + int64_t sentCount = ix::cobra_to_stdout_bot(config, + channel, + filter, + position, + fluentd, + quiet, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); + // + // We want at least 2 messages to be sent + // + REQUIRE(sentCount >= 2); + + // Give us 1s for all messages to be received + ix::msleep(1000); + + spdlog::info("Stopping snake server..."); + snakeServer.stop(); + + spdlog::info("Stopping redis server..."); + redisServer.stop(); + + publisherThread.join(); + } +} diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 0e5e422f..c9803ce1 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -49,7 +49,6 @@ add_executable(ws ws_redis_subscribe.cpp ws_redis_server.cpp ws_snake.cpp - ws_cobra_subscribe.cpp ws_cobra_metrics_publish.cpp ws_cobra_publish.cpp ws_cobra_metrics_to_redis.cpp diff --git a/ws/ws.cpp b/ws/ws.cpp index 03afe333..bc129ced 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -93,7 +94,6 @@ int main(int argc, char** argv) bool quiet = false; bool fluentd = false; bool compress = false; - bool strict = false; bool stress = false; bool disableAutomaticReconnection = false; bool disablePerMessageDeflate = false; @@ -291,7 +291,6 @@ int main(int argc, char** argv) "Size of the queue to hold messages before they are sent to Sentry"); cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_flag("-v", verbose, "Verbose"); - cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--position", position, "Stream position"); @@ -451,8 +450,18 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main( - cobraConfig, channel, filter, position, quiet, fluentd, runtime); + bool enableHeartbeat = true; + int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig, + channel, + filter, + position, + fluentd, + quiet, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); + ret = (int) sentCount; } else if (app.got_subcommand("cobra_publish")) { @@ -484,18 +493,18 @@ int main(int argc, char** argv) } else { - ret = ix::cobra_to_statsd_bot(cobraConfig, - channel, - filter, - position, - statsdClient, - fields, - gauge, - timer, - verbose, - maxQueueSize, - enableHeartbeat, - runtime); + ret = (int) ix::cobra_to_statsd_bot(cobraConfig, + channel, + filter, + position, + statsdClient, + fields, + gauge, + timer, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); } } } @@ -505,16 +514,15 @@ int main(int argc, char** argv) ix::SentryClient sentryClient(dsn); sentryClient.setTLSOptions(tlsOptions); - ret = ix::cobra_to_sentry_bot(cobraConfig, - channel, - filter, - position, - sentryClient, - verbose, - strict, - maxQueueSize, - enableHeartbeat, - runtime); + ret = (int) ix::cobra_to_sentry_bot(cobraConfig, + channel, + filter, + position, + sentryClient, + verbose, + maxQueueSize, + enableHeartbeat, + runtime); } else if (app.got_subcommand("cobra_metrics_to_redis")) { diff --git a/ws/ws.h b/ws/ws.h index b8039906..1145391e 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -77,14 +77,6 @@ namespace ix const std::string& channel, bool verbose); - int ws_cobra_subscribe_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - bool quiet, - bool fluentd, - int runtime); - int ws_cobra_publish_main(const ix::CobraConfig& appkey, const std::string& channel, const std::string& path); diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp deleted file mode 100644 index ac13197c..00000000 --- a/ws/ws_cobra_subscribe.cpp +++ /dev/null @@ -1,215 +0,0 @@ -/* - * ws_cobra_subscribe.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include - -namespace ix -{ - using StreamWriterPtr = std::unique_ptr; - - StreamWriterPtr makeStreamWriter() - { - Json::StreamWriterBuilder builder; - builder["commentStyle"] = "None"; - builder["indentation"] = ""; // will make the JSON object compact - std::unique_ptr jsonWriter(builder.newStreamWriter()); - return jsonWriter; - } - - std::string timeSinceEpoch() - { - std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); - std::chrono::system_clock::duration dtn = tp.time_since_epoch(); - - std::stringstream ss; - ss << dtn.count() * std::chrono::system_clock::period::num / - std::chrono::system_clock::period::den; - return ss.str(); - } - - void writeToStdout(bool fluentd, - const StreamWriterPtr& jsonWriter, - const Json::Value& msg, - const std::string& position) - { - Json::Value enveloppe; - if (fluentd) - { - enveloppe["producer"] = "cobra"; - enveloppe["consumer"] = "fluentd"; - - Json::Value nestedMessage(msg); - nestedMessage["position"] = position; - nestedMessage["created_at"] = timeSinceEpoch(); - enveloppe["message"] = nestedMessage; - - jsonWriter->write(enveloppe, &std::cout); - std::cout << std::endl; // add lf and flush - } - else - { - enveloppe = msg; - std::cout << position << " "; - jsonWriter->write(enveloppe, &std::cout); - std::cout << std::endl; - } - } - - int ws_cobra_subscribe_main(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - bool quiet, - bool fluentd, - int runtime) - { - ix::CobraConnection conn; - conn.configure(config); - conn.connect(); - - std::atomic msgPerSeconds(0); - std::atomic msgCount(0); - std::atomic stop(false); - std::atomic fatalCobraError(false); - auto jsonWriter = makeStreamWriter(); - - auto timer = [&msgPerSeconds, &msgCount, &stop] { - while (!stop) - { - spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); - - msgPerSeconds = 0; - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - std::thread t(timer); - - std::string subscriptionPosition(position); - - conn.setEventCallback([&conn, - &channel, - &jsonWriter, - &filter, - &subscriptionPosition, - &msgCount, - &msgPerSeconds, - &quiet, - &fluentd, - &fatalCobraError](const CobraEventPtr& event) { - if (event->type == ix::CobraEventType::Open) - { - spdlog::info("Subscriber connected"); - - for (auto&& it : event->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (event->type == ix::CobraEventType::Closed) - { - spdlog::info("Subscriber closed: {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Authenticated) - { - spdlog::info("Subscriber authenticated"); - spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition); - conn.subscribe( - channel, - filter, - subscriptionPosition, - [&jsonWriter, - &quiet, - &msgPerSeconds, - &msgCount, - &fluentd, - &subscriptionPosition](const Json::Value& msg, const std::string& position) { - if (!quiet) - { - writeToStdout(fluentd, jsonWriter, msg, position); - } - - msgPerSeconds++; - msgCount++; - - subscriptionPosition = position; - }); - } - else if (event->type == ix::CobraEventType::Subscribed) - { - spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::UnSubscribed) - { - spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::Error) - { - spdlog::error("Subscriber: error {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Published) - { - spdlog::error("Published message hacked: {}", event->msgId); - } - else if (event->type == ix::CobraEventType::Pong) - { - spdlog::info("Received websocket pong: {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::HandshakeError) - { - spdlog::error("Subscriber: Handshake error: {}", event->errMsg); - fatalCobraError = true; - } - else if (event->type == ix::CobraEventType::AuthenticationError) - { - spdlog::error("Subscriber: Authentication error: {}", event->errMsg); - fatalCobraError = true; - } - else if (event->type == ix::CobraEventType::SubscriptionError) - { - spdlog::error("Subscriber: Subscription error: {}", event->errMsg); - fatalCobraError = true; - } - }); - - // Run forever - if (runtime == -1) - { - while (true) - { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - - if (fatalCobraError) break; - } - } - // Run for a duration, used by unittesting now - else - { - for (int i = 0; i < runtime; ++i) - { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - - if (fatalCobraError) break; - } - } - - stop = true; - - conn.disconnect(); - t.join(); - - return fatalCobraError ? 1 : 0; - } -} // namespace ix